use allocative::Allocative;
use linera_base::data_types::{ArithmeticError, BlockHeight};
#[cfg(with_testing)]
use linera_views::context::MemoryContext;
use linera_views::{
bucket_queue_view::BucketQueueView,
context::Context,
register_view::RegisterView,
views::{ClonableView, View},
ViewError,
};
#[cfg(test)]
#[path = "unit_tests/outbox_tests.rs"]
mod outbox_tests;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
use prometheus::HistogramVec;
pub static OUTBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"outbox_size",
"Outbox size",
&[],
exponential_bucket_interval(1.0, 10_000.0),
)
});
}
const BLOCK_HEIGHT_BUCKET_SIZE: usize = 1000;
#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
#[derive(Debug, ClonableView, View, Allocative)]
#[allocative(bound = "C")]
pub struct OutboxStateView<C>
where
C: Context + 'static,
{
pub next_height_to_schedule: RegisterView<C, BlockHeight>,
pub queue: BucketQueueView<C, BlockHeight, BLOCK_HEIGHT_BUCKET_SIZE>,
}
impl<C> OutboxStateView<C>
where
C: Context + Clone + 'static,
{
pub(crate) fn schedule_message(
&mut self,
height: BlockHeight,
) -> Result<bool, ArithmeticError> {
if height < *self.next_height_to_schedule.get() {
return Ok(false);
}
self.next_height_to_schedule.set(height.try_add_one()?);
self.queue.push_back(height);
#[cfg(with_metrics)]
metrics::OUTBOX_SIZE
.with_label_values(&[])
.observe(self.queue.count() as f64);
Ok(true)
}
pub(crate) async fn mark_messages_as_received(
&mut self,
height: BlockHeight,
) -> Result<Vec<BlockHeight>, ViewError> {
let mut updates = Vec::new();
while let Some(h) = self.queue.front().copied() {
if h > height {
break;
}
self.queue.delete_front().await?;
updates.push(h);
}
#[cfg(with_metrics)]
metrics::OUTBOX_SIZE
.with_label_values(&[])
.observe(self.queue.count() as f64);
Ok(updates)
}
}
#[cfg(with_testing)]
impl OutboxStateView<MemoryContext<()>>
where
MemoryContext<()>: Context + Clone + Send + Sync + 'static,
{
pub async fn new() -> Self {
let context = MemoryContext::new_for_testing(());
Self::load(context)
.await
.expect("Loading from memory should work")
}
}