use crate::{marshal::Update, types::Height, Block, Reporter};
use commonware_utils::{
acknowledgement::Exact,
sync::{Mutex, Notify},
Acknowledgement,
};
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
#[derive(Clone)]
pub struct Application<B: Block> {
blocks: Arc<Mutex<BTreeMap<Height, B>>>,
#[allow(clippy::type_complexity)]
tip: Arc<Mutex<Option<(Height, B::Digest)>>>,
pending_acks: Arc<Mutex<VecDeque<(Height, Exact)>>>,
notify: Arc<Notify>,
auto_ack: bool,
}
impl<B: Block> Default for Application<B> {
fn default() -> Self {
Self {
blocks: Default::default(),
tip: Default::default(),
pending_acks: Default::default(),
notify: Arc::new(Notify::new()),
auto_ack: true,
}
}
}
impl<B: Block> Application<B> {
pub fn manual_ack() -> Self {
Self {
auto_ack: false,
..Default::default()
}
}
pub fn blocks(&self) -> BTreeMap<Height, B> {
self.blocks.lock().clone()
}
pub fn tip(&self) -> Option<(Height, B::Digest)> {
*self.tip.lock()
}
pub fn pending_ack_heights(&self) -> Vec<Height> {
self.pending_acks
.lock()
.iter()
.map(|(height, _)| *height)
.collect()
}
pub fn acknowledge_next(&self) -> Option<Height> {
let (height, ack) = self.pending_acks.lock().pop_front()?;
ack.acknowledge();
Some(height)
}
pub async fn acknowledged(&self) -> Height {
loop {
if let Some(height) = self.acknowledge_next() {
return height;
}
self.notify.notified().await;
}
}
}
impl<B: Block> Reporter for Application<B> {
type Activity = Update<B>;
async fn report(&mut self, activity: Self::Activity) {
match activity {
Update::Block(block, ack_tx) => {
let height = block.height();
self.blocks.lock().insert(height, block);
if self.auto_ack {
ack_tx.acknowledge();
} else {
self.pending_acks.lock().push_back((height, ack_tx));
self.notify.notify_one();
}
}
Update::Tip(_, height, digest) => {
*self.tip.lock() = Some((height, digest));
}
}
}
}