use allocative::Allocative;
use async_graphql::SimpleObject;
use linera_base::{
data_types::{ArithmeticError, BlockHeight},
ensure,
identifiers::ChainId,
};
#[cfg(with_testing)]
use linera_views::context::MemoryContext;
use linera_views::{
context::Context,
queue_view::QueueView,
register_view::RegisterView,
views::{ClonableView, View},
ViewError,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::{data_types::MessageBundle, ChainError};
#[cfg(test)]
#[path = "unit_tests/inbox_tests.rs"]
mod inbox_tests;
#[cfg(with_metrics)]
mod metrics {
use std::sync::LazyLock;
use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec};
use prometheus::HistogramVec;
pub static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"inbox_size",
"Inbox size",
&[],
exponential_bucket_interval(1.0, 2_000_000.0),
)
});
pub static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
"removed_bundles",
"Number of bundles removed by anticipation",
&[],
exponential_bucket_interval(1.0, 10_000.0),
)
});
}
#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
#[derive(Allocative, Debug, ClonableView, View)]
#[allocative(bound = "C")]
pub struct InboxStateView<C>
where
C: Clone + Context,
{
pub next_cursor_to_add: RegisterView<C, Cursor>,
pub next_cursor_to_remove: RegisterView<C, Cursor>,
pub added_bundles: QueueView<C, MessageBundle>,
pub removed_bundles: QueueView<C, MessageBundle>,
}
#[derive(
Debug,
Default,
Clone,
Copy,
Hash,
Eq,
PartialEq,
Ord,
PartialOrd,
Serialize,
Deserialize,
SimpleObject,
Allocative,
)]
pub struct Cursor {
height: BlockHeight,
index: u32,
}
#[derive(Error, Debug)]
pub(crate) enum InboxError {
#[error(transparent)]
ViewError(#[from] ViewError),
#[error(transparent)]
ArithmeticError(#[from] ArithmeticError),
#[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
UnexpectedBundle {
bundle: MessageBundle,
previous_bundle: MessageBundle,
},
#[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
IncorrectOrder {
bundle: MessageBundle,
next_cursor: Cursor,
},
#[error(
"{bundle:?} cannot be skipped: it must be received before the next \
messages from the same origin"
)]
UnskippableBundle { bundle: MessageBundle },
}
impl From<&MessageBundle> for Cursor {
#[inline]
fn from(bundle: &MessageBundle) -> Self {
Self {
height: bundle.height,
index: bundle.transaction_index,
}
}
}
impl Cursor {
fn try_add_one(self) -> Result<Self, ArithmeticError> {
let value = Self {
height: self.height,
index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
};
Ok(value)
}
}
impl From<(ChainId, ChainId, InboxError)> for ChainError {
fn from(value: (ChainId, ChainId, InboxError)) -> Self {
let (chain_id, origin, error) = value;
match error {
InboxError::ViewError(e) => ChainError::ViewError(e),
InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
InboxError::UnexpectedBundle {
bundle,
previous_bundle,
} => ChainError::UnexpectedMessage {
chain_id,
origin,
bundle: Box::new(bundle),
previous_bundle: Box::new(previous_bundle),
},
InboxError::IncorrectOrder {
bundle,
next_cursor,
} => ChainError::IncorrectMessageOrder {
chain_id,
origin,
bundle: Box::new(bundle),
next_height: next_cursor.height,
next_index: next_cursor.index,
},
InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
chain_id,
origin,
bundle: Box::new(bundle),
},
}
}
}
impl<C> InboxStateView<C>
where
C: Context + Clone + 'static,
{
pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
let cursor = self.next_cursor_to_add.get();
if cursor.index == 0 {
Ok(cursor.height)
} else {
Ok(cursor.height.try_add_one()?)
}
}
pub(crate) async fn remove_bundle(
&mut self,
bundle: &MessageBundle,
) -> Result<bool, InboxError> {
let cursor = Cursor::from(bundle);
ensure!(
cursor >= *self.next_cursor_to_remove.get(),
InboxError::IncorrectOrder {
bundle: bundle.clone(),
next_cursor: *self.next_cursor_to_remove.get(),
}
);
while let Some(previous_bundle) = self.added_bundles.front().await? {
if Cursor::from(&previous_bundle) >= cursor {
break;
}
ensure!(
previous_bundle.is_skippable(),
InboxError::UnskippableBundle {
bundle: previous_bundle
}
);
self.added_bundles.delete_front();
#[cfg(with_metrics)]
metrics::INBOX_SIZE
.with_label_values(&[])
.observe(self.added_bundles.count() as f64);
tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
}
let already_known = match self.added_bundles.front().await? {
Some(previous_bundle) => {
ensure!(
bundle == &previous_bundle,
InboxError::UnexpectedBundle {
previous_bundle,
bundle: bundle.clone(),
}
);
self.added_bundles.delete_front();
#[cfg(with_metrics)]
metrics::INBOX_SIZE
.with_label_values(&[])
.observe(self.added_bundles.count() as f64);
tracing::trace!("Consuming bundle {:?}", bundle);
true
}
None => {
tracing::trace!("Marking bundle as expected: {:?}", bundle);
self.removed_bundles.push_back(bundle.clone());
#[cfg(with_metrics)]
metrics::REMOVED_BUNDLES
.with_label_values(&[])
.observe(self.removed_bundles.count() as f64);
false
}
};
self.next_cursor_to_remove.set(cursor.try_add_one()?);
Ok(already_known)
}
pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
let cursor = Cursor::from(&bundle);
ensure!(
cursor >= *self.next_cursor_to_add.get(),
InboxError::IncorrectOrder {
bundle: bundle.clone(),
next_cursor: *self.next_cursor_to_add.get(),
}
);
let newly_added = match self.removed_bundles.front().await? {
Some(previous_bundle) => {
if Cursor::from(&previous_bundle) == cursor {
ensure!(
bundle == previous_bundle,
InboxError::UnexpectedBundle {
previous_bundle,
bundle,
}
);
self.removed_bundles.delete_front();
#[cfg(with_metrics)]
metrics::REMOVED_BUNDLES
.with_label_values(&[])
.observe(self.removed_bundles.count() as f64);
} else {
ensure!(
cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
InboxError::UnexpectedBundle {
previous_bundle,
bundle,
}
);
}
false
}
None => {
self.added_bundles.push_back(bundle);
#[cfg(with_metrics)]
metrics::INBOX_SIZE
.with_label_values(&[])
.observe(self.added_bundles.count() as f64);
true
}
};
self.next_cursor_to_add.set(cursor.try_add_one()?);
Ok(newly_added)
}
}
#[cfg(with_testing)]
impl InboxStateView<MemoryContext<()>>
where
MemoryContext<()>: Context + Clone + Send + Sync + 'static,
{
pub async fn new() -> Self {
let context = MemoryContext::new_for_testing(());
Self::load(context)
.await
.expect("Loading from memory should work")
}
}