linera_chain/
inbox.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_metrics)]
5use std::sync::LazyLock;
6
7use async_graphql::SimpleObject;
8use linera_base::{
9    data_types::{ArithmeticError, BlockHeight},
10    ensure,
11    identifiers::ChainId,
12};
13#[cfg(with_testing)]
14use linera_views::context::MemoryContext;
15use linera_views::{
16    context::Context,
17    queue_view::QueueView,
18    register_view::RegisterView,
19    views::{ClonableView, View, ViewError},
20};
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23
24use crate::{data_types::MessageBundle, ChainError, Origin};
25
26#[cfg(test)]
27#[path = "unit_tests/inbox_tests.rs"]
28mod inbox_tests;
29
30#[cfg(with_metrics)]
31use {
32    linera_base::prometheus_util::{exponential_bucket_interval, register_histogram_vec},
33    prometheus::HistogramVec,
34};
35
36#[cfg(with_metrics)]
37static INBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
38    register_histogram_vec(
39        "inbox_size",
40        "Inbox size",
41        &[],
42        exponential_bucket_interval(1.0, 500_000.0),
43    )
44});
45
46#[cfg(with_metrics)]
47static REMOVED_BUNDLES: LazyLock<HistogramVec> = LazyLock::new(|| {
48    register_histogram_vec(
49        "removed_bundles",
50        "Number of bundles removed by anticipation",
51        &[],
52        exponential_bucket_interval(1.0, 10_000.0),
53    )
54});
55
56/// The state of an inbox.
57/// * An inbox is used to track bundles received and executed locally.
58/// * A `MessageBundle` consists of a logical cursor `(height, index)` and some message
59///   content `messages`.
60/// * On the surface, an inbox looks like a FIFO queue: the main APIs are `add_bundle` and
61///   `remove_bundle`.
62/// * However, bundles can also be removed before they are added. When this happens,
63///   the bundles removed by anticipation are tracked in a separate queue. Any bundle added
64///   later will be required to match the first removed bundle and so on.
65/// * The cursors of added bundles (resp. removed bundles) must be increasing over time.
66/// * Reconciliation of added and removed bundles is allowed to skip some added bundles.
67///   However, the opposite is not true: every removed bundle must be eventually added.
68#[derive(Debug, ClonableView, View, async_graphql::SimpleObject)]
69pub struct InboxStateView<C>
70where
71    C: Clone + Context + Send + Sync,
72{
73    /// We have already added all the messages below this height and index.
74    pub next_cursor_to_add: RegisterView<C, Cursor>,
75    /// We have already removed all the messages below this height and index.
76    pub next_cursor_to_remove: RegisterView<C, Cursor>,
77    /// These bundles have been added and are waiting to be removed.
78    pub added_bundles: QueueView<C, MessageBundle>,
79    /// These bundles have been removed by anticipation and are waiting to be added.
80    /// At least one of `added_bundles` and `removed_bundles` should be empty.
81    pub removed_bundles: QueueView<C, MessageBundle>,
82}
83
84#[derive(
85    Debug,
86    Default,
87    Clone,
88    Copy,
89    Hash,
90    Eq,
91    PartialEq,
92    Ord,
93    PartialOrd,
94    Serialize,
95    Deserialize,
96    SimpleObject,
97)]
98pub struct Cursor {
99    height: BlockHeight,
100    index: u32,
101}
102
103#[derive(Error, Debug)]
104pub(crate) enum InboxError {
105    #[error(transparent)]
106    ViewError(#[from] ViewError),
107    #[error(transparent)]
108    ArithmeticError(#[from] ArithmeticError),
109    #[error("Cannot reconcile {bundle:?} with {previous_bundle:?}")]
110    UnexpectedBundle {
111        bundle: MessageBundle,
112        previous_bundle: MessageBundle,
113    },
114    #[error("{bundle:?} is out of order. Block and height should be at least: {next_cursor:?}")]
115    IncorrectOrder {
116        bundle: MessageBundle,
117        next_cursor: Cursor,
118    },
119    #[error(
120        "{bundle:?} cannot be skipped: it must be received before the next \
121        messages from the same origin"
122    )]
123    UnskippableBundle { bundle: MessageBundle },
124}
125
126impl From<&MessageBundle> for Cursor {
127    #[inline]
128    fn from(bundle: &MessageBundle) -> Self {
129        Self {
130            height: bundle.height,
131            index: bundle.transaction_index,
132        }
133    }
134}
135
136impl Cursor {
137    fn try_add_one(self) -> Result<Self, ArithmeticError> {
138        let value = Self {
139            height: self.height,
140            index: self.index.checked_add(1).ok_or(ArithmeticError::Overflow)?,
141        };
142        Ok(value)
143    }
144}
145
146impl From<(ChainId, Origin, InboxError)> for ChainError {
147    fn from(value: (ChainId, Origin, InboxError)) -> Self {
148        let (chain_id, origin, error) = value;
149        match error {
150            InboxError::ViewError(e) => ChainError::ViewError(e),
151            InboxError::ArithmeticError(e) => ChainError::ArithmeticError(e),
152            InboxError::UnexpectedBundle {
153                bundle,
154                previous_bundle,
155            } => ChainError::UnexpectedMessage {
156                chain_id,
157                origin: origin.into(),
158                bundle: Box::new(bundle),
159                previous_bundle: Box::new(previous_bundle),
160            },
161            InboxError::IncorrectOrder {
162                bundle,
163                next_cursor,
164            } => ChainError::IncorrectMessageOrder {
165                chain_id,
166                origin: origin.into(),
167                bundle: Box::new(bundle),
168                next_height: next_cursor.height,
169                next_index: next_cursor.index,
170            },
171            InboxError::UnskippableBundle { bundle } => ChainError::CannotSkipMessage {
172                chain_id,
173                origin: origin.into(),
174                bundle: Box::new(bundle),
175            },
176        }
177    }
178}
179
180impl<C> InboxStateView<C>
181where
182    C: Context + Clone + Send + Sync + 'static,
183{
184    /// Converts the internal cursor for added bundles into an externally-visible block height.
185    /// This makes sense because the rest of the system always adds bundles one block at a time.
186    pub fn next_block_height_to_receive(&self) -> Result<BlockHeight, ChainError> {
187        let cursor = self.next_cursor_to_add.get();
188        if cursor.index == 0 {
189            Ok(cursor.height)
190        } else {
191            Ok(cursor.height.try_add_one()?)
192        }
193    }
194
195    /// Consumes a bundle from the inbox.
196    ///
197    /// Returns `true` if the bundle was already known, i.e. it was present in `added_bundles`.
198    pub(crate) async fn remove_bundle(
199        &mut self,
200        bundle: &MessageBundle,
201    ) -> Result<bool, InboxError> {
202        // Record the latest cursor.
203        let cursor = Cursor::from(bundle);
204        ensure!(
205            cursor >= *self.next_cursor_to_remove.get(),
206            InboxError::IncorrectOrder {
207                bundle: bundle.clone(),
208                next_cursor: *self.next_cursor_to_remove.get(),
209            }
210        );
211        // Discard added bundles with lower cursors (if any).
212        while let Some(previous_bundle) = self.added_bundles.front().await? {
213            if Cursor::from(&previous_bundle) >= cursor {
214                break;
215            }
216            ensure!(
217                previous_bundle.is_skippable(),
218                InboxError::UnskippableBundle {
219                    bundle: previous_bundle
220                }
221            );
222            self.added_bundles.delete_front();
223            #[cfg(with_metrics)]
224            INBOX_SIZE
225                .with_label_values(&[])
226                .observe(self.added_bundles.count() as f64);
227            tracing::trace!("Skipping previously received bundle {:?}", previous_bundle);
228        }
229        // Reconcile the bundle with the next added bundle, or mark it as removed.
230        let already_known = match self.added_bundles.front().await? {
231            Some(previous_bundle) => {
232                // Rationale: If the two cursors are equal, then the bundles should match.
233                // Otherwise, at this point we know that `self.next_cursor_to_add >
234                // Cursor::from(&previous_bundle) > cursor`. Notably, `bundle` will never be
235                // added in the future. Therefore, we should fail instead of adding
236                // it to `self.removed_bundles`.
237                ensure!(
238                    bundle == &previous_bundle,
239                    InboxError::UnexpectedBundle {
240                        previous_bundle,
241                        bundle: bundle.clone(),
242                    }
243                );
244                self.added_bundles.delete_front();
245                #[cfg(with_metrics)]
246                INBOX_SIZE
247                    .with_label_values(&[])
248                    .observe(self.added_bundles.count() as f64);
249                tracing::trace!("Consuming bundle {:?}", bundle);
250                true
251            }
252            None => {
253                tracing::trace!("Marking bundle as expected: {:?}", bundle);
254                self.removed_bundles.push_back(bundle.clone());
255                #[cfg(with_metrics)]
256                REMOVED_BUNDLES
257                    .with_label_values(&[])
258                    .observe(self.removed_bundles.count() as f64);
259                false
260            }
261        };
262        self.next_cursor_to_remove.set(cursor.try_add_one()?);
263        Ok(already_known)
264    }
265
266    /// Pushes a bundle to the inbox. The verifications should not fail in production unless
267    /// many validators are faulty.
268    ///
269    /// Returns `true` if the bundle was new, `false` if it was already in `removed_bundles`.
270    pub(crate) async fn add_bundle(&mut self, bundle: MessageBundle) -> Result<bool, InboxError> {
271        // Record the latest cursor.
272        let cursor = Cursor::from(&bundle);
273        ensure!(
274            cursor >= *self.next_cursor_to_add.get(),
275            InboxError::IncorrectOrder {
276                bundle: bundle.clone(),
277                next_cursor: *self.next_cursor_to_add.get(),
278            }
279        );
280        // Find if the bundle was removed ahead of time.
281        let newly_added = match self.removed_bundles.front().await? {
282            Some(previous_bundle) => {
283                if Cursor::from(&previous_bundle) == cursor {
284                    // We already executed this bundle by anticipation. Remove it from
285                    // the queue.
286                    ensure!(
287                        bundle == previous_bundle,
288                        InboxError::UnexpectedBundle {
289                            previous_bundle,
290                            bundle,
291                        }
292                    );
293                    self.removed_bundles.delete_front();
294                    #[cfg(with_metrics)]
295                    REMOVED_BUNDLES
296                        .with_label_values(&[])
297                        .observe(self.removed_bundles.count() as f64);
298                } else {
299                    // The receiver has already executed a later bundle from the same
300                    // sender ahead of time so we should skip this one.
301                    ensure!(
302                        cursor < Cursor::from(&previous_bundle) && bundle.is_skippable(),
303                        InboxError::UnexpectedBundle {
304                            previous_bundle,
305                            bundle,
306                        }
307                    );
308                }
309                false
310            }
311            None => {
312                // Otherwise, schedule the messages for execution.
313                self.added_bundles.push_back(bundle);
314                #[cfg(with_metrics)]
315                INBOX_SIZE
316                    .with_label_values(&[])
317                    .observe(self.added_bundles.count() as f64);
318                true
319            }
320        };
321        self.next_cursor_to_add.set(cursor.try_add_one()?);
322        Ok(newly_added)
323    }
324}
325
326#[cfg(with_testing)]
327impl InboxStateView<MemoryContext<()>>
328where
329    MemoryContext<()>: Context + Clone + Send + Sync + 'static,
330{
331    pub async fn new() -> Self {
332        let context = MemoryContext::new_for_testing(());
333        Self::load(context)
334            .await
335            .expect("Loading from memory should work")
336    }
337}