1#[cfg(with_metrics)]
5use std::sync::LazyLock;
6
7use async_graphql::SimpleObject;
8use linera_base::{
9 data_types::{ArithmeticError, BlockHeight},
10 ensure,
11 identifiers::ChainId,
12};
13#[cfg(with_testing)]
14use linera_views::context::MemoryContext;
15use linera_views::{
16 context::Context,
17 queue_view::QueueView,
18 register_view::RegisterView,
19 views::{ClonableView, View, ViewError},
20};
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23
24use crate::{data_types::MessageBundle, ChainError, Origin};
25
26#[cfg(test)]
27#[path = "unit_tests/inbox_tests.rs"]
28mod inbox_tests;
29
30#[cfg(with_metrics)]
31use {
32 linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec},
33 prometheus::HistogramVec,
34};
35
36#[cfg(with_metrics)]
37static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
38 register_histogram_vec(
39 "inbox_size",
40 "Inbox size",
41 &[],
42 exponential_bucket_interval(1.0, 500_000.0),
43 )
44});
45
46#[cfg(with_metrics)]
47static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
48 register_histogram_vec(
49 "removed_bundles",
50 "Number of bundles removed by anticipation",
51 &[],
52 exponential_bucket_interval(1.0, 10_000.0),
53 )
54});
55
56#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
69pub struct InboxStateView<C>
70where
71 C: Clone + Context + Send + Sync,
72{
73 pub next_cursor_to_add: RegisterView<C, Cursor>,
75 pub next_cursor_to_remove: RegisterView<C, Cursor>,
77 pub added_bundles: QueueView<C, MessageBundle>,
79 pub removed_bundles: QueueView<C, MessageBundle>,
82}
83
84#[derive(
85 Debug,
86 Default,
87 Clone,
88 Copy,
89 Hash,
90 Eq,
91 PartialEq,
92 Ord,
93 PartialOrd,
94 Serialize,
95 Deserialize,
96 SimpleObject,
97)]
98pub struct Cursor {
99 height: BlockHeight,
100 index: u32,
101}
102
103#[derive(Error, Debug)]
104pub(crate) enum InboxError {
105 #[error(transparent)]
106 ViewError(#[from] ViewError),
107 #[error(transparent)]
108 ArithmeticError(#[from] ArithmeticError),
109 #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
110 UnexpectedBundle {
111 bundle: MessageBundle,
112 previous_bundle: MessageBundle,
113 },
114 #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
115 IncorrectOrder {
116 bundle: MessageBundle,
117 next_cursor: Cursor,
118 },
119 #[error(
120 "{bundle:?} cannot be skipped: it must be received before the next \
121 messages from the same origin"
122 )]
123 UnskippableBundle { bundle: MessageBundle },
124}
125
126impl From<&MessageBundle> for Cursor {
127 #[inline]
128 fn from(bundle: &MessageBundle) -> Self {
129 Self {
130 height: bundle.height,
131 index: bundle.transaction_index,
132 }
133 }
134}
135
136impl Cursor {
137 fn try_add_one(self) -> Result<Self, ArithmeticError> {
138 let value = Self {
139 height: self.height,
140 index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
141 };
142 Ok(value)
143 }
144}
145
146impl From<(ChainId, Origin, InboxError)> for ChainError {
147 fn from(value: (ChainId, Origin, InboxError)) -> Self {
148 let (chain_id, origin, error) = value;
149 match error {
150 InboxError::ViewError(e) => ChainError::ViewError(e),
151 InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
152 InboxError::UnexpectedBundle {
153 bundle,
154 previous_bundle,
155 } => ChainError::UnexpectedMessage {
156 chain_id,
157 origin: origin.into(),
158 bundle: Box::new(bundle),
159 previous_bundle: Box::new(previous_bundle),
160 },
161 InboxError::IncorrectOrder {
162 bundle,
163 next_cursor,
164 } => ChainError::IncorrectMessageOrder {
165 chain_id,
166 origin: origin.into(),
167 bundle: Box::new(bundle),
168 next_height: next_cursor.height,
169 next_index: next_cursor.index,
170 },
171 InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
172 chain_id,
173 origin: origin.into(),
174 bundle: Box::new(bundle),
175 },
176 }
177 }
178}
179
180impl<C> InboxStateView<C>
181where
182 C: Context + Clone + Send + Sync + 'static,
183{
184 pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
187 let cursor = self.next_cursor_to_add.get();
188 if cursor.index == 0 {
189 Ok(cursor.height)
190 } else {
191 Ok(cursor.height.try_add_one()?)
192 }
193 }
194
195 pub(crate) async fn remove_bundle(
199 &mut self,
200 bundle: &MessageBundle,
201 ) -> Result<bool, InboxError> {
202 let cursor = Cursor::from(bundle);
204 ensure!(
205 cursor >= *self.next_cursor_to_remove.get(),
206 InboxError::IncorrectOrder {
207 bundle: bundle.clone(),
208 next_cursor: *self.next_cursor_to_remove.get(),
209 }
210 );
211 while let Some(previous_bundle) = self.added_bundles.front().await? {
213 if Cursor::from(&previous_bundle) >= cursor {
214 break;
215 }
216 ensure!(
217 previous_bundle.is_skippable(),
218 InboxError::UnskippableBundle {
219 bundle: previous_bundle
220 }
221 );
222 self.added_bundles.delete_front();
223 #[cfg(with_metrics)]
224 INBOX_SIZE
225 .with_label_values(&[])
226 .observe(self.added_bundles.count() as f64);
227 tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
228 }
229 let already_known = match self.added_bundles.front().await? {
231 Some(previous_bundle) => {
232 ensure!(
238 bundle == &previous_bundle,
239 InboxError::UnexpectedBundle {
240 previous_bundle,
241 bundle: bundle.clone(),
242 }
243 );
244 self.added_bundles.delete_front();
245 #[cfg(with_metrics)]
246 INBOX_SIZE
247 .with_label_values(&[])
248 .observe(self.added_bundles.count() as f64);
249 tracing::trace!("Consuming bundle {:?}", bundle);
250 true
251 }
252 None => {
253 tracing::trace!("Marking bundle as expected: {:?}", bundle);
254 self.removed_bundles.push_back(bundle.clone());
255 #[cfg(with_metrics)]
256 REMOVED_BUNDLES
257 .with_label_values(&[])
258 .observe(self.removed_bundles.count() as f64);
259 false
260 }
261 };
262 self.next_cursor_to_remove.set(cursor.try_add_one()?);
263 Ok(already_known)
264 }
265
266 pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
271 let cursor = Cursor::from(&bundle);
273 ensure!(
274 cursor >= *self.next_cursor_to_add.get(),
275 InboxError::IncorrectOrder {
276 bundle: bundle.clone(),
277 next_cursor: *self.next_cursor_to_add.get(),
278 }
279 );
280 let newly_added = match self.removed_bundles.front().await? {
282 Some(previous_bundle) => {
283 if Cursor::from(&previous_bundle) == cursor {
284 ensure!(
287 bundle == previous_bundle,
288 InboxError::UnexpectedBundle {
289 previous_bundle,
290 bundle,
291 }
292 );
293 self.removed_bundles.delete_front();
294 #[cfg(with_metrics)]
295 REMOVED_BUNDLES
296 .with_label_values(&[])
297 .observe(self.removed_bundles.count() as f64);
298 } else {
299 ensure!(
302 cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
303 InboxError::UnexpectedBundle {
304 previous_bundle,
305 bundle,
306 }
307 );
308 }
309 false
310 }
311 None => {
312 self.added_bundles.push_back(bundle);
314 #[cfg(with_metrics)]
315 INBOX_SIZE
316 .with_label_values(&[])
317 .observe(self.added_bundles.count() as f64);
318 true
319 }
320 };
321 self.next_cursor_to_add.set(cursor.try_add_one()?);
322 Ok(newly_added)
323 }
324}
325
326#[cfg(with_testing)]
327impl InboxStateView<MemoryContext<()>>
328where
329 MemoryContext<()>: Context + Clone + Send + Sync + 'static,
330{
331 pub async fn new() -> Self {
332 let context = MemoryContext::new_for_testing(());
333 Self::load(context)
334 .await
335 .expect("Loading from memory should work")
336 }
337}