use crate::{
simplex::types::{Activity, Finalization, Notarization},
types::{Height, Round},
Block, Heightable, Reporter,
};
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_storage::archive;
use commonware_utils::{channels::fallible::AsyncFallibleExt, vec::NonEmptyVec};
use futures::{
channel::{mpsc, oneshot},
future::BoxFuture,
stream::{FuturesOrdered, Stream},
FutureExt,
};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub enum Identifier<D: Digest> {
Height(Height),
Commitment(D),
Latest,
}
impl<D: Digest> From<Height> for Identifier<D> {
fn from(src: Height) -> Self {
Self::Height(src)
}
}
impl<D: Digest> From<&D> for Identifier<D> {
fn from(src: &D) -> Self {
Self::Commitment(*src)
}
}
impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
fn from(src: archive::Identifier<'_, D>) -> Self {
match src {
archive::Identifier::Index(index) => Self::Height(Height::new(index)),
archive::Identifier::Key(key) => Self::Commitment(*key),
}
}
}
pub(crate) enum Message<S: Scheme, B: Block> {
GetInfo {
identifier: Identifier<B::Commitment>,
response: oneshot::Sender<Option<(Height, B::Commitment)>>,
},
GetBlock {
identifier: Identifier<B::Commitment>,
response: oneshot::Sender<Option<B>>,
},
GetFinalization {
height: Height,
response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
},
HintFinalized {
height: Height,
targets: NonEmptyVec<S::PublicKey>,
},
Subscribe {
round: Option<Round>,
commitment: B::Commitment,
response: oneshot::Sender<B>,
},
Proposed {
round: Round,
block: B,
},
Verified {
round: Round,
block: B,
},
SetFloor {
height: Height,
},
Notarization {
notarization: Notarization<S, B::Commitment>,
},
Finalization {
finalization: Finalization<S, B::Commitment>,
},
}
#[derive(Clone)]
pub struct Mailbox<S: Scheme, B: Block> {
sender: mpsc::Sender<Message<S, B>>,
}
impl<S: Scheme, B: Block> Mailbox<S, B> {
pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
Self { sender }
}
pub async fn get_info(
&mut self,
identifier: impl Into<Identifier<B::Commitment>>,
) -> Option<(Height, B::Commitment)> {
let identifier = identifier.into();
self.sender
.request(|response| Message::GetInfo {
identifier,
response,
})
.await
.flatten()
}
pub async fn get_block(
&mut self,
identifier: impl Into<Identifier<B::Commitment>>,
) -> Option<B> {
let identifier = identifier.into();
self.sender
.request(|response| Message::GetBlock {
identifier,
response,
})
.await
.flatten()
}
pub async fn get_finalization(
&mut self,
height: Height,
) -> Option<Finalization<S, B::Commitment>> {
self.sender
.request(|response| Message::GetFinalization { height, response })
.await
.flatten()
}
pub async fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
self.sender
.send_lossy(Message::HintFinalized { height, targets })
.await;
}
pub async fn subscribe(
&mut self,
round: Option<Round>,
commitment: B::Commitment,
) -> oneshot::Receiver<B> {
let (tx, rx) = oneshot::channel();
self.sender
.send_lossy(Message::Subscribe {
round,
commitment,
response: tx,
})
.await;
rx
}
pub async fn ancestry(
&mut self,
(start_round, start_commitment): (Option<Round>, B::Commitment),
) -> Option<AncestorStream<S, B>> {
self.subscribe(start_round, start_commitment)
.await
.await
.ok()
.map(|block| AncestorStream::new(self.clone(), [block]))
}
pub async fn proposed(&mut self, round: Round, block: B) {
self.sender
.send_lossy(Message::Proposed { round, block })
.await;
}
pub async fn verified(&mut self, round: Round, block: B) {
self.sender
.send_lossy(Message::Verified { round, block })
.await;
}
pub async fn set_floor(&mut self, height: Height) {
self.sender.send_lossy(Message::SetFloor { height }).await;
}
}
impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
type Activity = Activity<S, B::Commitment>;
async fn report(&mut self, activity: Self::Activity) {
let message = match activity {
Activity::Notarization(notarization) => Message::Notarization { notarization },
Activity::Finalization(finalization) => Message::Finalization { finalization },
_ => {
return;
}
};
self.sender.send_lossy(message).await;
}
}
#[inline]
fn subscribe_block_future<S: Scheme, B: Block>(
mut marshal: Mailbox<S, B>,
commitment: B::Commitment,
) -> BoxFuture<'static, Option<B>> {
async move {
let receiver = marshal.subscribe(None, commitment).await;
receiver.await.ok()
}
.boxed()
}
#[pin_project]
pub struct AncestorStream<S: Scheme, B: Block> {
marshal: Mailbox<S, B>,
buffered: Vec<B>,
#[pin]
pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
}
impl<S: Scheme, B: Block> AncestorStream<S, B> {
pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
let mut buffered = initial.into_iter().collect::<Vec<B>>();
buffered.sort_by_key(Heightable::height);
buffered.windows(2).for_each(|window| {
assert_eq!(
window[0].height().next(),
window[1].height(),
"initial blocks must be contiguous in height"
);
});
Self {
marshal,
buffered,
pending: FuturesOrdered::new(),
}
}
}
impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
type Item = B;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const END_BOUND: Height = Height::new(1);
let mut this = self.project();
if let Some(block) = this.buffered.pop() {
let height = block.height();
let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
if should_fetch_parent {
let parent_commitment = block.parent();
let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
this.pending.push_back(future);
if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
this.buffered.push(block);
}
}
return Poll::Ready(Some(block));
}
match this.pending.as_mut().poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
Poll::Ready(Some(Some(block))) => {
let height = block.height();
let should_fetch_parent = height > END_BOUND;
if should_fetch_parent {
let parent_commitment = block.parent();
let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
this.pending.push_back(future);
if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
this.buffered.push(block);
}
}
Poll::Ready(Some(block))
}
}
}
}