use crossbeam_channel as channel;
use tendermint::evidence::{ConflictingHeadersEvidence, Evidence};
use crate::bail;
use crate::errors::{Error, ErrorKind};
use crate::evidence::EvidenceReporter;
use crate::fork_detector::{Fork, ForkDetection, ForkDetector};
use crate::light_client::LightClient;
use crate::peer_list::PeerList;
use crate::state::State;
use crate::types::{Height, LatestStatus, LightBlock, PeerId, Status};
pub trait Handle: Send + Sync {
fn latest_trusted(&self) -> Result<Option<LightBlock>, Error>;
fn latest_status(&self) -> Result<LatestStatus, Error>;
fn verify_to_highest(&self) -> Result<LightBlock, Error>;
fn verify_to_target(&self, _height: Height) -> Result<LightBlock, Error>;
fn terminate(&self) -> Result<(), Error>;
}
#[derive(Debug)]
enum HandleInput {
Terminate(channel::Sender<()>),
VerifyToHighest(channel::Sender<Result<LightBlock, Error>>),
VerifyToTarget(Height, channel::Sender<Result<LightBlock, Error>>),
LatestTrusted(channel::Sender<Option<LightBlock>>),
GetStatus(channel::Sender<LatestStatus>),
}
#[derive(Debug)]
pub struct Instance {
pub light_client: LightClient,
pub state: State,
}
impl Instance {
pub fn new(light_client: LightClient, state: State) -> Self {
Self {
light_client,
state,
}
}
pub fn latest_trusted(&self) -> Option<LightBlock> {
self.state.light_store.latest(Status::Trusted)
}
pub fn trust_block(&mut self, lb: &LightBlock) {
self.state.light_store.update(lb, Status::Trusted);
}
}
pub struct Supervisor {
peers: PeerList<Instance>,
fork_detector: Box<dyn ForkDetector>,
evidence_reporter: Box<dyn EvidenceReporter>,
sender: channel::Sender<HandleInput>,
receiver: channel::Receiver<HandleInput>,
}
impl std::fmt::Debug for Supervisor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Supervisor")
.field("peers", &self.peers)
.finish()
}
}
static_assertions::assert_impl_all!(Supervisor: Send);
impl Supervisor {
pub fn new(
peers: PeerList<Instance>,
fork_detector: impl ForkDetector + 'static,
evidence_reporter: impl EvidenceReporter + 'static,
) -> Self {
let (sender, receiver) = channel::unbounded::<HandleInput>();
Self {
peers,
sender,
receiver,
fork_detector: Box::new(fork_detector),
evidence_reporter: Box::new(evidence_reporter),
}
}
pub fn handle(&self) -> SupervisorHandle {
SupervisorHandle::new(self.sender.clone())
}
pub fn latest_trusted(&self) -> Option<LightBlock> {
self.peers.primary().latest_trusted()
}
pub fn verify_to_highest(&mut self) -> Result<LightBlock, Error> {
self.verify(None)
}
fn latest_status(&mut self) -> LatestStatus {
let latest_trusted = self.peers.primary().latest_trusted();
let mut connected_nodes: Vec<PeerId> = Vec::new();
connected_nodes.push(self.peers.primary_id());
connected_nodes.append(&mut self.peers.witnesses_ids().iter().copied().collect());
match latest_trusted {
Some(trusted) => LatestStatus::new(
Some(trusted.signed_header.header.height.value()),
Some(trusted.signed_header.header.hash()),
Some(trusted.next_validators.hash()),
connected_nodes,
),
None => LatestStatus::new(None, None, None, connected_nodes),
}
}
pub fn verify_to_target(&mut self, height: Height) -> Result<LightBlock, Error> {
self.verify(Some(height))
}
fn verify(&mut self, height: Option<Height>) -> Result<LightBlock, Error> {
let primary = self.peers.primary_mut();
let verdict = match height {
None => primary.light_client.verify_to_highest(&mut primary.state),
Some(height) => primary
.light_client
.verify_to_target(height, &mut primary.state),
};
match verdict {
Ok(verified_block) => {
let trusted_block = primary
.latest_trusted()
.ok_or(ErrorKind::NoTrustedState(Status::Trusted))?;
let outcome = self.detect_forks(&verified_block, &trusted_block)?;
match outcome {
ForkDetection::Detected(forks) => {
let forked = self.process_forks(forks)?;
if !forked.is_empty() {
bail!(ErrorKind::ForkDetected(forked))
}
self.verify(height)
}
ForkDetection::NotDetected => {
self.peers.primary_mut().trust_block(&verified_block);
Ok(verified_block)
}
}
}
Err(err) => {
self.peers.replace_faulty_primary(Some(err))?;
self.verify(height)
}
}
}
fn process_forks(&mut self, forks: Vec<Fork>) -> Result<Vec<PeerId>, Error> {
let mut forked = Vec::with_capacity(forks.len());
for fork in forks {
match fork {
Fork::Forked { primary, witness } => {
let provider = witness.provider;
self.report_evidence(provider, &primary, &witness)?;
forked.push(provider);
}
Fork::Timeout(provider, _error) => {
self.peers.replace_faulty_witness(provider);
}
Fork::Faulty(block, _error) => {
self.peers.replace_faulty_witness(block.provider);
}
}
}
Ok(forked)
}
fn report_evidence(
&mut self,
provider: PeerId,
primary: &LightBlock,
witness: &LightBlock,
) -> Result<(), Error> {
let evidence = ConflictingHeadersEvidence::new(
primary.signed_header.clone(),
witness.signed_header.clone(),
);
self.evidence_reporter
.report(Evidence::ConflictingHeaders(Box::new(evidence)), provider)
.map_err(ErrorKind::Io)?;
Ok(())
}
fn detect_forks(
&self,
verified_block: &LightBlock,
trusted_block: &LightBlock,
) -> Result<ForkDetection, Error> {
if self.peers.witnesses_ids().is_empty() {
bail!(ErrorKind::NoWitnesses);
}
let witnesses = self
.peers
.witnesses_ids()
.iter()
.filter_map(|id| self.peers.get(id))
.collect();
self.fork_detector
.detect_forks(verified_block, &trusted_block, witnesses)
}
pub fn run(mut self) -> Result<(), Error> {
loop {
let event = self.receiver.recv().map_err(ErrorKind::from)?;
match event {
HandleInput::LatestTrusted(sender) => {
let outcome = self.latest_trusted();
sender.send(outcome).map_err(ErrorKind::from)?;
}
HandleInput::Terminate(sender) => {
sender.send(()).map_err(ErrorKind::from)?;
return Ok(());
}
HandleInput::VerifyToTarget(height, sender) => {
let outcome = self.verify_to_target(height);
sender.send(outcome).map_err(ErrorKind::from)?;
}
HandleInput::VerifyToHighest(sender) => {
let outcome = self.verify_to_highest();
sender.send(outcome).map_err(ErrorKind::from)?;
}
HandleInput::GetStatus(sender) => {
let outcome = self.latest_status();
sender.send(outcome).map_err(ErrorKind::from)?;
}
}
}
}
}
#[derive(Clone)]
pub struct SupervisorHandle {
sender: channel::Sender<HandleInput>,
}
impl SupervisorHandle {
fn new(sender: channel::Sender<HandleInput>) -> Self {
Self { sender }
}
fn verify(
&self,
make_event: impl FnOnce(channel::Sender<Result<LightBlock, Error>>) -> HandleInput,
) -> Result<LightBlock, Error> {
let (sender, receiver) = channel::bounded::<Result<LightBlock, Error>>(1);
let event = make_event(sender);
self.sender.send(event).map_err(ErrorKind::from)?;
receiver.recv().map_err(ErrorKind::from)?
}
}
impl Handle for SupervisorHandle {
fn latest_trusted(&self) -> Result<Option<LightBlock>, Error> {
let (sender, receiver) = channel::bounded::<Option<LightBlock>>(1);
self.sender
.send(HandleInput::LatestTrusted(sender))
.map_err(ErrorKind::from)?;
Ok(receiver.recv().map_err(ErrorKind::from)?)
}
fn latest_status(&self) -> Result<LatestStatus, Error> {
let (sender, receiver) = channel::bounded::<LatestStatus>(1);
self.sender
.send(HandleInput::GetStatus(sender))
.map_err(ErrorKind::from)?;
Ok(receiver.recv().map_err(ErrorKind::from)?)
}
fn verify_to_highest(&self) -> Result<LightBlock, Error> {
self.verify(HandleInput::VerifyToHighest)
}
fn verify_to_target(&self, height: Height) -> Result<LightBlock, Error> {
self.verify(|sender| HandleInput::VerifyToTarget(height, sender))
}
fn terminate(&self) -> Result<(), Error> {
let (sender, receiver) = channel::bounded::<()>(1);
self.sender
.send(HandleInput::Terminate(sender))
.map_err(ErrorKind::from)?;
Ok(receiver.recv().map_err(ErrorKind::from)?)
}
}