1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
use crate::commons::{Block, ConsensusError, Database, RoundUpdate};
use crate::contract_state::Operations;
use crate::phase::Phase;
use crate::agreement::step;
use crate::execution_ctx::ExecutionCtx;
use crate::messages::Message;
use crate::queue::Queue;
use crate::user::provisioners::Provisioners;
use crate::util::pending_queue::PendingQueue;
use crate::{config, selection};
use crate::{firststep, secondstep};
use tracing::{error, Instrument};
use std::sync::Arc;
use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
pub struct Consensus<T: Operations, D: Database> {
/// inbound is a queue of messages that comes from outside world
inbound: PendingQueue,
/// outbound_msgs is a queue of messages, this consensus instance shares
/// with the outside world.
outbound: PendingQueue,
/// future_msgs is a queue of messages read from inbound_msgs queue. These
/// msgs are pending to be handled in a future round/step.
future_msgs: Arc<Mutex<Queue<Message>>>,
/// agreement_layer implements Agreement message handler within the context
/// of a separate task execution.
agreement_process: step::Agreement,
/// Reference to the executor of any EST-related call
executor: Arc<Mutex<T>>,
// Database
db: Arc<Mutex<D>>,
}
impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
/// Creates an instance of Consensus.
///
/// # Arguments
///
/// * `inbound` - a queue of input messages consumed by main loop
/// * `outbound` - a queue of output messages that main loop broadcasts to the outside world
///
/// * `agr_inbound_queue` - a queue of input messages consumed solely by Agreement loop
/// * `agr_outbound_queue` - a queue of output messages that Agreement loop broadcasts to the outside world
pub fn new(
inbound: PendingQueue,
outbound: PendingQueue,
agr_inbound_queue: PendingQueue,
agr_outbound_queue: PendingQueue,
executor: Arc<Mutex<T>>,
db: Arc<Mutex<D>>,
) -> Self {
Self {
inbound,
outbound,
future_msgs: Arc::new(Mutex::new(Queue::default())),
agreement_process: step::Agreement::new(
agr_inbound_queue,
agr_outbound_queue,
),
executor,
db,
}
}
/// Spins the consensus state machine. The consensus runs for the whole round until either a new round is produced or the node needs to re-sync.
///
/// The Agreement loop (acting roundwise) runs concurrently with the generation-selection-reduction loop (acting step-wise).
///
/// # Arguments
///
/// * `provisioner` - a list of the provisioners based on the most recent contract state.
///
/// * `cancel_rx` - a chan that allows the client to drop consensus execution on demand.
pub async fn spin(
&mut self,
ru: RoundUpdate,
mut provisioners: Provisioners,
cancel_rx: oneshot::Receiver<i32>,
) -> Result<Block, ConsensusError> {
// Enable/Disable all members stakes depending on the current round. If
// a stake is not eligible for this round, it's disabled.
provisioners.update_eligibility_flag(ru.round);
// Agreement loop Executes agreement loop in a separate tokio::task to
// collect (aggr)Agreement messages.
let mut agreement_task_handle = self.agreement_process.spawn(
ru.clone(),
provisioners.clone(),
self.db.clone(),
);
// Consensus loop - generation-selection-reduction loop
let mut main_task_handle = self.spawn_main_loop(
ru,
provisioners,
self.agreement_process.inbound_queue.clone(),
);
// Wait for any of the tasks to complete.
let result;
tokio::select! {
recv = &mut agreement_task_handle => {
result = recv.expect("wrong agreement_task handle");
tracing::trace!("agreement result: {:?}", result);
},
recv = &mut main_task_handle => {
result = recv.expect("wrong main_task handle");
tracing::trace!("main_loop result: {:?}", result);
},
// Canceled from outside.
// This could be triggered by Synchronizer or on node termination.
_ = cancel_rx => {
result = Err(ConsensusError::Canceled);
tracing::trace!("consensus canceled");
}
}
// Tear-down procedure
// Delete all candidates
self.db.lock().await.delete_candidate_blocks();
// Cancel all tasks
agreement_task_handle.abort();
main_task_handle.abort();
result
}
fn spawn_main_loop(
&mut self,
ru: RoundUpdate,
mut provisioners: Provisioners,
mut agr_inbound_queue: PendingQueue,
) -> JoinHandle<Result<Block, ConsensusError>> {
let inbound = self.inbound.clone();
let outbound = self.outbound.clone();
let future_msgs = self.future_msgs.clone();
let executor = self.executor.clone();
let db = self.db.clone();
tokio::spawn(async move {
if ru.round > 0 {
future_msgs.lock().await.clear_round(ru.round - 1);
}
let mut phases = [
Phase::Selection(selection::step::Selection::new(
executor.clone(),
db.clone(),
)),
Phase::Reduction1(firststep::step::Reduction::new(
executor.clone(),
)),
Phase::Reduction2(secondstep::step::Reduction::new(executor)),
];
// Consensus loop
// Initialize and run consensus loop
let mut step: u8 = 0;
loop {
let mut msg = Message::empty();
// Execute a single iteration
for phase in phases.iter_mut() {
step += 1;
// Initialize new phase with message returned by previous phase.
phase.initialize(&msg, ru.round, step);
// Construct phase execution context
let ctx = ExecutionCtx::new(
inbound.clone(),
outbound.clone(),
future_msgs.clone(),
&mut provisioners,
ru.clone(),
step,
);
// Execute a phase.
// An error returned here terminates consensus
// round. This normally happens if consensus channel is cancelled by
// agreement loop on finding the winning block for this round.
msg = phase
.run(ctx)
.instrument(tracing::info_span!(
"main_task",
round = ru.round,
step = step,
pubkey = ru.pubkey_bls.encode_short_hex(),
))
.await?;
if step >= config::CONSENSUS_MAX_STEP {
return Err(ConsensusError::MaxStepReached);
}
}
// Delegate (agreement) message result to agreement loop for
// further processing.
Self::send_agreement(&mut agr_inbound_queue, msg.clone()).await;
}
})
}
async fn send_agreement(
agr_inbound_queue: &mut PendingQueue,
msg: Message,
) {
let _ = agr_inbound_queue
.send(msg.clone())
.await
.map_err(|e| error!("send agreement failed with {:?}", e));
}
}