Skip to main content

linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14        OracleResponse, Timestamp,
15    },
16    ensure,
17    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18    ownership::ChainOwnership,
19    time::{Duration, Instant},
20};
21use linera_execution::{
22    committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23    Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25    FLAG_MANDATORY_APPS_NEED_ACCEPTED_MESSAGE,
26};
27use linera_views::{
28    bucket_queue_view::BucketQueueView,
29    context::Context,
30    log_view::LogView,
31    map_view::MapView,
32    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
33    register_view::RegisterView,
34    set_view::SetView,
35    views::{ClonableView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38use tracing::{info, instrument};
39
40use crate::{
41    block::{Block, ConfirmedBlock},
42    block_tracker::BlockExecutionTracker,
43    data_types::{
44        BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
45        IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
46    },
47    inbox::{Cursor, InboxError, InboxStateView},
48    manager::ChainManager,
49    outbox::OutboxStateView,
50    pending_blobs::PendingBlobsView,
51    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
52};
53
54#[cfg(test)]
55#[path = "unit_tests/chain_tests.rs"]
56mod chain_tests;
57
58#[cfg(with_metrics)]
59use linera_base::prometheus_util::MeasureLatency;
60
61#[cfg(with_metrics)]
62pub(crate) mod metrics {
63    use std::sync::LazyLock;
64
65    use linera_base::prometheus_util::{
66        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
67    };
68    use linera_execution::ResourceTracker;
69    use prometheus::{HistogramVec, IntCounterVec};
70
71    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73    });
74
75    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76        register_histogram_vec(
77            "block_execution_latency",
78            "Block execution latency",
79            &[],
80            exponential_bucket_interval(50.0_f64, 10_000_000.0),
81        )
82    });
83
84    #[cfg(with_metrics)]
85    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86        register_histogram_vec(
87            "message_execution_latency",
88            "Message execution latency",
89            &[],
90            exponential_bucket_interval(0.1_f64, 1_000_000.0),
91        )
92    });
93
94    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95        register_histogram_vec(
96            "operation_execution_latency",
97            "Operation execution latency",
98            &[],
99            exponential_bucket_interval(0.1_f64, 1_000_000.0),
100        )
101    });
102
103    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104        register_histogram_vec(
105            "wasm_fuel_used_per_block",
106            "Wasm fuel used per block",
107            &[],
108            exponential_bucket_interval(10.0, 100_000_000.0),
109        )
110    });
111
112    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113        register_histogram_vec(
114            "evm_fuel_used_per_block",
115            "EVM fuel used per block",
116            &[],
117            exponential_bucket_interval(10.0, 100_000_000.0),
118        )
119    });
120
121    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122        register_histogram_vec(
123            "vm_num_reads_per_block",
124            "VM number of reads per block",
125            &[],
126            exponential_bucket_interval(0.1, 100.0),
127        )
128    });
129
130    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131        register_histogram_vec(
132            "vm_bytes_read_per_block",
133            "VM number of bytes read per block",
134            &[],
135            exponential_bucket_interval(0.1, 10_000_000.0),
136        )
137    });
138
139    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140        register_histogram_vec(
141            "vm_bytes_written_per_block",
142            "VM number of bytes written per block",
143            &[],
144            exponential_bucket_interval(0.1, 10_000_000.0),
145        )
146    });
147
148    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149        register_histogram_vec(
150            "state_hash_computation_latency",
151            "Time to recompute the state hash, in microseconds",
152            &[],
153            exponential_bucket_interval(1.0, 2_000_000.0),
154        )
155    });
156
157    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158        register_histogram_vec(
159            "num_outboxes",
160            "Number of outboxes",
161            &[],
162            exponential_bucket_interval(1.0, 10_000.0),
163        )
164    });
165
166    /// Tracks block execution metrics in Prometheus.
167    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
168        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
169        WASM_FUEL_USED_PER_BLOCK
170            .with_label_values(&[])
171            .observe(tracker.wasm_fuel as f64);
172        EVM_FUEL_USED_PER_BLOCK
173            .with_label_values(&[])
174            .observe(tracker.evm_fuel as f64);
175        VM_NUM_READS_PER_BLOCK
176            .with_label_values(&[])
177            .observe(tracker.read_operations as f64);
178        VM_BYTES_READ_PER_BLOCK
179            .with_label_values(&[])
180            .observe(tracker.bytes_read as f64);
181        VM_BYTES_WRITTEN_PER_BLOCK
182            .with_label_values(&[])
183            .observe(tracker.bytes_written as f64);
184    }
185}
186
187/// The BCS-serialized size of an empty [`Block`].
188pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
189
190/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
191#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
192#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
193pub struct TimestampedBundleInInbox {
194    /// The origin and cursor of the bundle.
195    pub entry: BundleInInbox,
196    /// The timestamp when the bundle was added to the inbox.
197    pub seen: Timestamp,
198}
199
200/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
201#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
202#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Allocative)]
203pub struct BundleInInbox {
204    /// The origin from which we received the bundle.
205    pub origin: ChainId,
206    /// The cursor of the bundle in the inbox.
207    pub cursor: Cursor,
208}
209
210// The `TimestampedBundleInInbox` is a relatively small type, so a total
211// of 100 seems reasonable for the storing of the data.
212const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
213
214/// A view accessing the state of a chain.
215#[cfg_attr(
216    with_graphql,
217    derive(async_graphql::SimpleObject),
218    graphql(cache_control(no_cache))
219)]
220#[derive(Debug, RootView, ClonableView, Allocative)]
221#[allocative(bound = "C")]
222pub struct ChainStateView<C>
223where
224    C: Clone + Context + 'static,
225{
226    /// Execution state, including system and user applications.
227    pub execution_state: ExecutionStateView<C>,
228    /// Hash of the execution state.
229    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
230
231    /// Block-chaining state.
232    pub tip_state: RegisterView<C, ChainTipState>,
233
234    /// Consensus state.
235    pub manager: ChainManager<C>,
236    /// Pending validated block that is still missing blobs.
237    /// The incomplete set of blobs for the pending validated block.
238    pub pending_validated_blobs: PendingBlobsView<C>,
239    /// The incomplete sets of blobs for upcoming proposals.
240    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
241
242    /// Hashes of all certified blocks for this sender.
243    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
244    pub confirmed_log: LogView<C, CryptoHash>,
245    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
246    pub received_log: LogView<C, ChainAndHeight>,
247    /// The number of `received_log` entries we have synchronized, for each validator.
248    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
249
250    /// Mailboxes used to receive messages indexed by their origin.
251    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
252    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
253    pub unskippable_bundles:
254        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
255    /// Unskippable bundles that have been removed but are still in the queue.
256    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
257    /// The heights of previous blocks that sent messages to the same recipients.
258    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
259    /// The heights of previous blocks that published events to the same streams.
260    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
261    /// Mailboxes used to send messages, indexed by their target.
262    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
263    /// Number of outgoing messages in flight for each block height.
264    /// We use a `RegisterView` to prioritize speed for small maps.
265    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
266    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
267    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
268
269    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
270    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
271
272    /// The indices of next events we expect to see per stream (could be ahead of the last
273    /// executed block in sparse chains).
274    pub next_expected_events: MapView<C, StreamId, u32>,
275
276    /// Inboxes with at least one pending added bundle. This allows us to avoid loading all
277    /// inboxes. `None` means the set hasn't been computed yet for this chain (backwards
278    /// compatibility with pre-existing database entries).
279    pub nonempty_inboxes: RegisterView<C, Option<BTreeSet<ChainId>>>,
280
281    /// The local wall-clock time when block 0 was last executed. Used to prevent
282    /// reset-on-incorrect-outcome from looping: if not enough time has elapsed since
283    /// the last reset, the error is returned instead.
284    pub block_zero_executed_at: RegisterView<C, Timestamp>,
285}
286
287/// Block-chaining state.
288#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
289#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
290pub struct ChainTipState {
291    /// Hash of the latest certified block in this chain, if any.
292    pub block_hash: Option<CryptoHash>,
293    /// Sequence number tracking blocks.
294    pub next_block_height: BlockHeight,
295    /// Number of incoming message bundles.
296    pub num_incoming_bundles: u32,
297    /// Number of operations.
298    pub num_operations: u32,
299    /// Number of outgoing messages.
300    pub num_outgoing_messages: u32,
301}
302
303impl ChainTipState {
304    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
305    /// expected parent.
306    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
307        ensure!(
308            new_block.height == self.next_block_height,
309            ChainError::UnexpectedBlockHeight {
310                expected_block_height: self.next_block_height,
311                found_block_height: new_block.height
312            }
313        );
314        ensure!(
315            new_block.previous_block_hash == self.block_hash,
316            ChainError::UnexpectedPreviousBlockHash
317        );
318        Ok(())
319    }
320
321    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
322    /// it is higher than the tip.
323    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
324        ensure!(
325            self.next_block_height >= height,
326            ChainError::MissingEarlierBlocks {
327                current_block_height: self.next_block_height,
328            }
329        );
330        Ok(self.next_block_height > height)
331    }
332
333    /// Checks if the measurement counters would be valid.
334    pub fn update_counters(
335        &mut self,
336        transactions: &[Transaction],
337        messages: &[Vec<OutgoingMessage>],
338    ) -> Result<(), ChainError> {
339        let mut num_incoming_bundles = 0u32;
340        let mut num_operations = 0u32;
341
342        for transaction in transactions {
343            match transaction {
344                Transaction::ReceiveMessages(_) => {
345                    num_incoming_bundles = num_incoming_bundles
346                        .checked_add(1)
347                        .ok_or(ArithmeticError::Overflow)?;
348                }
349                Transaction::ExecuteOperation(_) => {
350                    num_operations = num_operations
351                        .checked_add(1)
352                        .ok_or(ArithmeticError::Overflow)?;
353                }
354            }
355        }
356
357        self.num_incoming_bundles = self
358            .num_incoming_bundles
359            .checked_add(num_incoming_bundles)
360            .ok_or(ArithmeticError::Overflow)?;
361
362        self.num_operations = self
363            .num_operations
364            .checked_add(num_operations)
365            .ok_or(ArithmeticError::Overflow)?;
366
367        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
368            .map_err(|_| ArithmeticError::Overflow)?;
369        self.num_outgoing_messages = self
370            .num_outgoing_messages
371            .checked_add(num_outgoing_messages)
372            .ok_or(ArithmeticError::Overflow)?;
373
374        Ok(())
375    }
376}
377
378impl<C> ChainStateView<C>
379where
380    C: Context + Clone + 'static,
381    C::Extra: ExecutionRuntimeContext,
382{
383    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
384    pub fn chain_id(&self) -> ChainId {
385        self.context().extra().chain_id()
386    }
387
388    #[instrument(skip_all, fields(
389        chain_id = %self.chain_id(),
390    ))]
391    pub async fn query_application(
392        &mut self,
393        local_time: Timestamp,
394        query: Query,
395        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
396    ) -> Result<QueryOutcome, ChainError> {
397        let context = QueryContext {
398            chain_id: self.chain_id(),
399            next_block_height: self.tip_state.get().next_block_height,
400            local_time,
401        };
402        self.execution_state
403            .query_application(context, query, service_runtime_endpoint)
404            .await
405            .with_execution_context(ChainExecutionContext::Query)
406    }
407
408    #[instrument(skip_all, fields(
409        chain_id = %self.chain_id(),
410        application_id = %application_id
411    ))]
412    pub async fn describe_application(
413        &mut self,
414        application_id: ApplicationId,
415    ) -> Result<ApplicationDescription, ChainError> {
416        self.execution_state
417            .system
418            .describe_application(application_id, &mut TransactionTracker::default())
419            .await
420            .with_execution_context(ChainExecutionContext::DescribeApplication)
421    }
422
423    #[instrument(skip_all, fields(
424        chain_id = %self.chain_id(),
425        target = %target,
426        height = %height
427    ))]
428    pub async fn mark_messages_as_received(
429        &mut self,
430        target: &ChainId,
431        height: BlockHeight,
432    ) -> Result<bool, ChainError> {
433        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
434        let updates = outbox.mark_messages_as_received(height).await?;
435        if updates.is_empty() {
436            return Ok(false);
437        }
438        for update in updates {
439            let counter = self
440                .outbox_counters
441                .get_mut()
442                .get_mut(&update)
443                .ok_or_else(|| {
444                    ChainError::CorruptedChainState("message counter should be present".into())
445                })?;
446            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
447            if *counter == 0 {
448                // Important for the test in `all_messages_delivered_up_to`.
449                self.outbox_counters.get_mut().remove(&update);
450            }
451        }
452        if outbox.queue.count() == 0 {
453            self.nonempty_outboxes.get_mut().remove(target);
454            // If the outbox is empty and not ahead of the executed blocks, remove it.
455            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
456                self.outboxes.remove_entry(target)?;
457            }
458        }
459        #[cfg(with_metrics)]
460        metrics::NUM_OUTBOXES
461            .with_label_values(&[])
462            .observe(self.nonempty_outboxes.get().len() as f64);
463        Ok(true)
464    }
465
466    /// Returns true if there are no more outgoing messages in flight up to the given
467    /// block height.
468    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
469        tracing::debug!(
470            "Messages left in {:.8}'s outbox: {:?}",
471            self.chain_id(),
472            self.outbox_counters.get()
473        );
474        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
475            key > &height
476        } else {
477            true
478        }
479    }
480
481    /// Invariant for the states of active chains.
482    pub async fn is_active(&self) -> Result<bool, ChainError> {
483        Ok(self.execution_state.system.is_active().await?)
484    }
485
486    /// Initializes the chain if it is not active yet.
487    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
488        // Initialize ourselves.
489        if self
490            .execution_state
491            .system
492            .initialize_chain(self.chain_id())
493            .await
494            .with_execution_context(ChainExecutionContext::Block)?
495        {
496            // The chain was already initialized.
497            return Ok(());
498        }
499        // Recompute the state hash.
500        let hash = self.execution_state.crypto_hash_mut().await?;
501        self.execution_state_hash.set(Some(hash));
502        self.reset_chain_manager(BlockHeight(0), local_time).await?;
503        Ok(())
504    }
505
506    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
507    ///
508    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
509    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
510        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
511            return Ok(height.saturating_add(BlockHeight(1)));
512        }
513        Ok(self.tip_state.get().next_block_height)
514    }
515
516    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
517    /// internal error if the bundle doesn't appear to be new, based on the sender's
518    /// height. The value `local_time` is specific to each validator and only used for
519    /// round timeouts.
520    ///
521    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
522    #[instrument(skip_all, fields(
523        chain_id = %self.chain_id(),
524        origin = %origin,
525        bundle_height = %bundle.height
526    ))]
527    pub async fn receive_message_bundle_with_inbox(
528        &mut self,
529        inbox: &mut InboxStateView<C>,
530        origin: &ChainId,
531        bundle: MessageBundle,
532        local_time: Timestamp,
533        add_to_received_log: bool,
534    ) -> Result<(), ChainError> {
535        assert!(!bundle.messages.is_empty());
536        let chain_id = self.chain_id();
537        tracing::trace!(
538            "Processing new messages from {origin} at height {}",
539            bundle.height,
540        );
541        let chain_and_height = ChainAndHeight {
542            chain_id: *origin,
543            height: bundle.height,
544        };
545
546        match self.initialize_if_needed(local_time).await {
547            Ok(_) => (),
548            // if the only issue was that we couldn't initialize the chain because of a
549            // missing chain description blob, we might still want to update the inbox
550            Err(ChainError::ExecutionError(exec_err, _))
551                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
552                if blobs.iter().all(|blob_id| {
553                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
554                })) => {}
555            err => {
556                return err;
557            }
558        }
559
560        // Process the inbox bundle and update the inbox state.
561        let newly_added = inbox
562            .add_bundle(bundle)
563            .await
564            .map_err(|error| match error {
565                InboxError::ViewError(error) => ChainError::ViewError(error),
566                error => ChainError::CorruptedChainState(format!(
567                    "while processing messages in certified block: {error}"
568                )),
569            })?;
570        if newly_added {
571            if let Some(set) = self.nonempty_inboxes.get_mut() {
572                set.insert(*origin);
573            }
574        }
575
576        // Remember the certificate for future validator/client synchronizations.
577        if add_to_received_log {
578            self.received_log.push(chain_and_height);
579        }
580        Ok(())
581    }
582
583    /// Updates the `received_log` trackers.
584    pub fn update_received_certificate_trackers(
585        &mut self,
586        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
587    ) {
588        for (name, tracker) in new_trackers {
589            self.received_certificate_trackers
590                .get_mut()
591                .entry(name)
592                .and_modify(|t| {
593                    // Because several synchronizations could happen in parallel, we need to make
594                    // sure to never go backward.
595                    if tracker > *t {
596                        *t = tracker;
597                    }
598                })
599                .or_insert(tracker);
600        }
601    }
602
603    pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
604        self.execution_state
605            .system
606            .current_committee()
607            .await?
608            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
609    }
610
611    pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
612        Ok(self.execution_state.system.ownership.get().await?)
613    }
614
615    /// Removes the incoming message bundles in the block from the inboxes.
616    ///
617    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
618    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
619    /// and `false` if the block is already confirmed.
620    #[instrument(skip_all, fields(
621        chain_id = %self.chain_id(),
622    ))]
623    pub async fn remove_bundles_from_inboxes(
624        &mut self,
625        timestamp: Timestamp,
626        must_be_present: bool,
627        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
628    ) -> Result<(), ChainError> {
629        let chain_id = self.chain_id();
630        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
631        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
632            ensure!(
633                bundle.timestamp <= timestamp,
634                ChainError::IncorrectBundleTimestamp {
635                    chain_id,
636                    bundle_timestamp: bundle.timestamp,
637                    block_timestamp: timestamp,
638                }
639            );
640            let bundles = bundles_by_origin.entry(*origin).or_default();
641            bundles.push(bundle);
642        }
643        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
644        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
645        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
646            tracing::trace!(
647                "Removing [{}] from inbox for {origin}",
648                bundles
649                    .iter()
650                    .map(|bundle| bundle.height.to_string())
651                    .collect::<Vec<_>>()
652                    .join(", ")
653            );
654            for bundle in bundles {
655                // Mark the message as processed in the inbox.
656                let was_present = inbox
657                    .remove_bundle(bundle)
658                    .await
659                    .map_err(|error| (chain_id, origin, error))?;
660                if must_be_present {
661                    ensure!(
662                        was_present,
663                        ChainError::MissingCrossChainUpdate {
664                            chain_id,
665                            origin,
666                            height: bundle.height,
667                        }
668                    );
669                }
670            }
671            inbox.observe_size_metric();
672            if inbox.added_bundles.count() == 0 {
673                if let Some(set) = self.nonempty_inboxes.get_mut() {
674                    set.remove(&origin);
675                }
676            }
677        }
678        Ok(())
679    }
680
681    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
682    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
683        self.nonempty_outboxes.get().iter().copied().collect()
684    }
685
686    /// Returns the outboxes for the given targets, or an error if any of them are missing.
687    pub async fn load_outboxes(
688        &self,
689        targets: &[ChainId],
690    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
691        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
692        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
693        optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
694    }
695
696    /// Executes a block with a specified policy for handling bundle failures.
697    #[instrument(skip_all, fields(
698        chain_id = %block.chain_id,
699        block_height = %block.height
700    ))]
701    #[expect(clippy::too_many_arguments)]
702    async fn execute_block_inner(
703        chain: &mut ExecutionStateView<C>,
704        confirmed_log: &LogView<C, CryptoHash>,
705        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
706        previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
707        block: &mut ProposedBlock,
708        local_time: Timestamp,
709        round: Option<u32>,
710        published_blobs: &[Blob],
711        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
712        exec_policy: BundleExecutionPolicy,
713    ) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
714        // AutoRetry is incompatible with replaying oracle responses because discarding or
715        // rejecting bundles would change which transactions execute.
716        if !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort) {
717            assert!(
718                replaying_oracle_responses.is_none(),
719                "Cannot use AutoRetry policy when replaying oracle responses"
720            );
721        }
722
723        #[cfg(with_metrics)]
724        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
725        chain.system.timestamp.set(block.timestamp);
726
727        let committee_policy = chain
728            .system
729            .current_committee()
730            .await?
731            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
732            .1
733            .policy()
734            .clone();
735
736        let mut resource_controller = ResourceController::new(
737            Arc::new(committee_policy),
738            ResourceTracker::default(),
739            block.authenticated_signer,
740        );
741
742        for blob in published_blobs {
743            let blob_id = blob.id();
744            resource_controller
745                .policy()
746                .check_blob_size(blob.content())
747                .with_execution_context(ChainExecutionContext::Block)?;
748            chain.system.used_blobs.insert(&blob_id)?;
749        }
750
751        let mut block_execution_tracker = BlockExecutionTracker::new(
752            &mut resource_controller,
753            published_blobs
754                .iter()
755                .map(|blob| (blob.id(), blob))
756                .collect(),
757            local_time,
758            replaying_oracle_responses,
759            block,
760        )?;
761
762        // Extract failure policy settings.
763        let max_failures = match exec_policy.on_failure {
764            BundleFailurePolicy::Abort => 0,
765            BundleFailurePolicy::AutoRetry { max_failures } => max_failures,
766        };
767        let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
768        let mut failure_count = 0u32;
769
770        // Track cumulative bundle execution time if time budget is set.
771        let time_budget = exec_policy.time_budget;
772        let mut cumulative_bundle_time = Duration::ZERO;
773
774        let mut i = 0;
775        while i < block.transactions.len() {
776            let transaction = &mut block.transactions[i];
777            let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
778
779            // Check if time budget has been exceeded for bundles.
780            if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
781                info!(
782                    ?cumulative_bundle_time,
783                    ?time_budget,
784                    "Time budget exceeded, discarding all remaining message bundles"
785                );
786                Self::discard_remaining_bundles(block, i, None);
787                continue;
788            }
789
790            // Checkpoint before bundle transactions if using auto-retry.
791            let checkpoint = if auto_retry && is_bundle {
792                Some((
793                    chain.clone_unchecked()?,
794                    block_execution_tracker.create_checkpoint(),
795                ))
796            } else {
797                None
798            };
799
800            // Track time for bundle execution when time budget is set.
801            let bundle_start = if is_bundle && time_budget.is_some() {
802                Some(Instant::now())
803            } else {
804                None
805            };
806
807            let result = block_execution_tracker
808                .execute_transaction(&*transaction, round, chain)
809                .await;
810
811            // Update cumulative bundle time.
812            if let Some(start) = bundle_start {
813                cumulative_bundle_time += start.elapsed();
814            }
815
816            // If the transaction executed successfully, we move on to the next one.
817            // On transient errors (e.g. missing blobs) we fail, so it can be retried after
818            // syncing. In auto-retry mode, we can discard or reject message bundles that failed
819            // with non-transient errors.
820            let (error, context, incoming_bundle, saved_chain, saved_tracker) =
821                match (result, transaction, checkpoint) {
822                    (Ok(()), _, _) => {
823                        i += 1;
824                        continue;
825                    }
826                    (
827                        Err(ChainError::ExecutionError(error, context)),
828                        Transaction::ReceiveMessages(incoming_bundle),
829                        Some((saved_chain, saved_tracker)),
830                    ) if !error.is_transient_error() => {
831                        (error, context, incoming_bundle, saved_chain, saved_tracker)
832                    }
833                    (Err(e), _, _) => return Err(e),
834                };
835
836            // Restore checkpoint.
837            *chain = saved_chain;
838            block_execution_tracker.restore_checkpoint(saved_tracker);
839
840            if error.is_limit_error() && i > 0 {
841                failure_count += 1;
842                // If we've exceeded max failures, discard all remaining message bundles.
843                let maybe_sender = if failure_count > max_failures {
844                    info!(
845                        failure_count,
846                        max_failures,
847                        "Exceeded max bundle failures, discarding all remaining message bundles"
848                    );
849                    None
850                } else {
851                    // Not the first - discard it and same-sender subsequent bundles.
852                    info!(
853                        %error,
854                        index = i,
855                        origin = %incoming_bundle.origin,
856                        "Message bundle exceeded block limits and will be discarded for \
857                        retry in a later block"
858                    );
859                    Some(incoming_bundle.origin)
860                };
861                Self::discard_remaining_bundles(block, i, maybe_sender);
862                // Continue without incrementing i (next transaction is now at i).
863            } else if incoming_bundle.bundle.is_protected()
864                || incoming_bundle.action == MessageAction::Reject
865            {
866                // Protected bundles cannot be rejected. Failed rejected bundles fail the block.
867                return Err(ChainError::ExecutionError(error, context));
868            } else {
869                // Reject the bundle: either a non-limit error, or the first bundle
870                // exceeded limits (and is inherently too large for any block).
871                info!(
872                    %error,
873                    index = i,
874                    origin = %incoming_bundle.origin,
875                    "Message bundle failed to execute and will be rejected"
876                );
877                incoming_bundle.action = MessageAction::Reject;
878                // Retry the transaction as rejected (don't increment i).
879            }
880        }
881
882        // This can only happen if all transactions were incoming bundles that all got discarded
883        // due to resource limit errors. This is unlikely in practice but theoretically possible.
884        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
885
886        let recipients = block_execution_tracker.recipients();
887        let heights = previous_message_blocks_view.multi_get(&recipients).await?;
888        let mut recipient_heights = Vec::new();
889        let mut indices = Vec::new();
890        for (height, recipient) in heights.into_iter().zip(recipients) {
891            if let Some(height) = height {
892                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
893                indices.push(index);
894                recipient_heights.push((recipient, height));
895            }
896        }
897        let hashes = confirmed_log.multi_get(indices).await?;
898        let mut previous_message_blocks = BTreeMap::new();
899        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
900            let hash = hash.ok_or_else(|| {
901                ChainError::CorruptedChainState("missing entry in confirmed_log".into())
902            })?;
903            previous_message_blocks.insert(recipient, (hash, height));
904        }
905
906        let streams = block_execution_tracker.event_streams();
907        let heights = previous_event_blocks_view.multi_get(&streams).await?;
908        let mut stream_heights = Vec::new();
909        let mut indices = Vec::new();
910        for (stream, height) in streams.into_iter().zip(heights) {
911            if let Some(height) = height {
912                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
913                indices.push(index);
914                stream_heights.push((stream, height));
915            }
916        }
917        let hashes = confirmed_log.multi_get(indices).await?;
918        let mut previous_event_blocks = BTreeMap::new();
919        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
920            let hash = hash.ok_or_else(|| {
921                ChainError::CorruptedChainState("missing entry in confirmed_log".into())
922            })?;
923            previous_event_blocks.insert(stream, (hash, height));
924        }
925
926        let state_hash = {
927            #[cfg(with_metrics)]
928            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
929            chain.crypto_hash_mut().await?
930        };
931
932        let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
933            block_execution_tracker.finalize(block.transactions.len());
934
935        Ok((
936            BlockExecutionOutcome {
937                messages,
938                previous_message_blocks,
939                previous_event_blocks,
940                state_hash,
941                oracle_responses,
942                events,
943                blobs,
944                operation_results,
945            },
946            resource_tracker,
947        ))
948    }
949
950    /// Discards all bundles from the given origin (or all if `None`), starting at the given index.
951    fn discard_remaining_bundles(
952        block: &mut ProposedBlock,
953        mut index: usize,
954        maybe_origin: Option<ChainId>,
955    ) {
956        while index < block.transactions.len() {
957            if matches!(
958                &block.transactions[index],
959                Transaction::ReceiveMessages(bundle)
960                if maybe_origin.is_none_or(|origin| bundle.origin == origin)
961            ) {
962                block.transactions.remove(index);
963            } else {
964                index += 1;
965            }
966        }
967    }
968
969    /// Executes a block with a specified policy for handling bundle failures.
970    ///
971    /// This method supports automatic retry with checkpointing when bundles fail:
972    /// - For limit errors (block too large, fuel exceeded, etc.): the bundle is discarded
973    ///   so it can be retried in a later block, unless it's the first transaction
974    ///   (which gets rejected as inherently too large).
975    /// - For non-limit errors: the bundle is rejected (triggering bounced messages).
976    /// - After `max_failures` failed bundles, all remaining message bundles are discarded.
977    ///
978    /// The block may be modified to reflect the actual executed transactions.
979    #[instrument(skip_all, fields(
980        chain_id = %self.chain_id(),
981        block_height = %block.height
982    ))]
983    pub async fn execute_block(
984        &mut self,
985        mut block: ProposedBlock,
986        local_time: Timestamp,
987        round: Option<u32>,
988        published_blobs: &[Blob],
989        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
990        policy: BundleExecutionPolicy,
991    ) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
992        assert_eq!(
993            block.chain_id,
994            self.execution_state.context().extra().chain_id()
995        );
996
997        self.initialize_if_needed(local_time).await?;
998
999        let chain_timestamp = *self.execution_state.system.timestamp.get();
1000        ensure!(
1001            chain_timestamp <= block.timestamp,
1002            ChainError::InvalidBlockTimestamp {
1003                parent: chain_timestamp,
1004                new: block.timestamp
1005            }
1006        );
1007        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1008
1009        ensure!(
1010            block.published_blob_ids()
1011                == published_blobs
1012                    .iter()
1013                    .map(|blob| blob.id())
1014                    .collect::<BTreeSet<_>>(),
1015            ChainError::InternalError("published_blobs mismatch".to_string())
1016        );
1017
1018        if *self.execution_state.system.closed.get() {
1019            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
1020        }
1021
1022        let mandatory_apps_need_accepted_message = self
1023            .current_committee()
1024            .await?
1025            .1
1026            .policy()
1027            .http_request_allow_list
1028            .contains(FLAG_MANDATORY_APPS_NEED_ACCEPTED_MESSAGE);
1029        Self::check_app_permissions(
1030            self.execution_state
1031                .system
1032                .application_permissions
1033                .get()
1034                .await?,
1035            &block,
1036            mandatory_apps_need_accepted_message,
1037        )?;
1038
1039        Self::execute_block_inner(
1040            &mut self.execution_state,
1041            &self.confirmed_log,
1042            &self.previous_message_blocks,
1043            &self.previous_event_blocks,
1044            &mut block,
1045            local_time,
1046            round,
1047            published_blobs,
1048            replaying_oracle_responses,
1049            policy,
1050        )
1051        .await
1052        .map(|(outcome, tracker)| (block, outcome, tracker))
1053    }
1054
1055    /// Tracks emitted events per stream and returns the set of streams where new contiguous
1056    /// events were observed (starting from `next_expected_events`).
1057    ///
1058    /// Callers must ensure that `next_expected_events` has been initialized for every stream
1059    /// present in the block's events before calling this method. See
1060    /// `ChainWorkerState::initialize_next_expected_events`.
1061    async fn process_emitted_events(
1062        &mut self,
1063        block: &Block,
1064    ) -> Result<BTreeSet<StreamId>, ChainError> {
1065        let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1066        for event in block.body.events.iter().flatten() {
1067            emitted_streams
1068                .entry(event.stream_id.clone())
1069                .or_default()
1070                .insert(event.index);
1071        }
1072
1073        let mut updated_streams = BTreeSet::new();
1074        for (stream_id, indices) in emitted_streams {
1075            // Epoch 0 is created at genesis, so the first published event is index 1.
1076            let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1077                1
1078            } else {
1079                0
1080            };
1081            let mut current_expected_index = self
1082                .next_expected_events
1083                .get(&stream_id)
1084                .await?
1085                .unwrap_or(initial_index);
1086            for index in indices {
1087                if index == current_expected_index {
1088                    updated_streams.insert(stream_id.clone());
1089                    current_expected_index = index.saturating_add(1);
1090                }
1091            }
1092            if current_expected_index != 0 {
1093                self.next_expected_events
1094                    .insert(&stream_id, current_expected_index)?;
1095            }
1096        }
1097        Ok(updated_streams)
1098    }
1099
1100    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
1101    /// manager. This does not touch the execution state itself, which must be updated separately.
1102    /// Returns the set of event streams that were updated as a result of applying the block.
1103    #[instrument(skip_all, fields(
1104        chain_id = %self.chain_id(),
1105        block_height = %block.inner().inner().header.height
1106    ))]
1107    pub async fn apply_confirmed_block(
1108        &mut self,
1109        block: &ConfirmedBlock,
1110        local_time: Timestamp,
1111    ) -> Result<BTreeSet<StreamId>, ChainError> {
1112        let hash = block.inner().hash();
1113        let block = block.inner().inner();
1114        if block.header.height == BlockHeight::ZERO {
1115            self.block_zero_executed_at.set(local_time);
1116        }
1117        self.execution_state_hash.set(Some(block.header.state_hash));
1118        let recipients = self.process_outgoing_messages(block).await?;
1119
1120        for recipient in recipients {
1121            self.previous_message_blocks
1122                .insert(&recipient, block.header.height)?;
1123        }
1124        for event in block.body.events.iter().flatten() {
1125            self.previous_event_blocks
1126                .insert(&event.stream_id, block.header.height)?;
1127        }
1128        let updated_streams = self.process_emitted_events(block).await?;
1129        // Last, reset the consensus state based on the current ownership.
1130        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1131            .await?;
1132
1133        // Advance to next block height.
1134        let tip = self.tip_state.get_mut();
1135        tip.block_hash = Some(hash);
1136        tip.next_block_height.try_add_assign_one()?;
1137        tip.update_counters(&block.body.transactions, &block.body.messages)?;
1138        self.confirmed_log.push(hash);
1139        self.preprocessed_blocks.remove(&block.header.height)?;
1140        Ok(updated_streams)
1141    }
1142
1143    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
1144    /// Returns the set of event streams that were updated as a result of preprocessing the block.
1145    #[instrument(skip_all, fields(
1146        chain_id = %self.chain_id(),
1147        block_height = %block.inner().inner().header.height
1148    ))]
1149    pub async fn preprocess_block(
1150        &mut self,
1151        block: &ConfirmedBlock,
1152    ) -> Result<BTreeSet<StreamId>, ChainError> {
1153        let hash = block.inner().hash();
1154        let block = block.inner().inner();
1155        let height = block.header.height;
1156        if height < self.tip_state.get().next_block_height {
1157            return Ok(BTreeSet::new());
1158        }
1159        self.process_outgoing_messages(block).await?;
1160        let updated_streams = self.process_emitted_events(block).await?;
1161        self.preprocessed_blocks.insert(&height, hash)?;
1162        Ok(updated_streams)
1163    }
1164
1165    /// Returns whether this is a child chain.
1166    pub async fn is_child(&self) -> Result<bool, ChainError> {
1167        let description = self.execution_state.system.description.get().await?;
1168        let Some(description) = description else {
1169            // Root chains are always initialized, so this must be a child chain.
1170            return Ok(true);
1171        };
1172        Ok(description.is_child())
1173    }
1174
1175    /// Verifies that the block is valid according to the chain's application permission settings.
1176    #[instrument(skip_all, fields(
1177        block_height = %block.height,
1178        num_transactions = %block.transactions.len()
1179    ))]
1180    fn check_app_permissions(
1181        app_permissions: &ApplicationPermissions,
1182        block: &ProposedBlock,
1183        mandatory_apps_need_accepted_message: bool,
1184    ) -> Result<(), ChainError> {
1185        let mut mandatory = HashSet::<ApplicationId>::from_iter(
1186            app_permissions.mandatory_applications.iter().copied(),
1187        );
1188        for transaction in &block.transactions {
1189            match transaction {
1190                Transaction::ExecuteOperation(operation)
1191                    if operation.is_exempt_from_permissions() =>
1192                {
1193                    mandatory.clear()
1194                }
1195                Transaction::ExecuteOperation(operation) => {
1196                    ensure!(
1197                        app_permissions.can_execute_operations(&operation.application_id()),
1198                        ChainError::AuthorizedApplications(
1199                            app_permissions.execute_operations.clone().unwrap()
1200                        )
1201                    );
1202                    if let Operation::User { application_id, .. } = operation {
1203                        mandatory.remove(application_id);
1204                    }
1205                }
1206                Transaction::ReceiveMessages(incoming_bundle)
1207                    if !mandatory_apps_need_accepted_message
1208                        || incoming_bundle.action == MessageAction::Accept =>
1209                {
1210                    for pending in incoming_bundle.messages() {
1211                        if let Message::User { application_id, .. } = &pending.message {
1212                            mandatory.remove(application_id);
1213                        }
1214                    }
1215                }
1216                Transaction::ReceiveMessages(_) => {}
1217            }
1218        }
1219        ensure!(
1220            mandatory.is_empty(),
1221            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1222        );
1223        Ok(())
1224    }
1225
1226    /// Returns the hashes of all blocks we have in the given range.
1227    ///
1228    /// If the input heights are in ascending order, the hashes will be in the same order.
1229    /// Otherwise they may be unordered.
1230    #[instrument(skip_all, fields(
1231        chain_id = %self.chain_id(),
1232        next_block_height = %self.tip_state.get().next_block_height,
1233    ))]
1234    pub async fn block_hashes(
1235        &self,
1236        heights: impl IntoIterator<Item = BlockHeight>,
1237    ) -> Result<Vec<CryptoHash>, ChainError> {
1238        let next_height = self.tip_state.get().next_block_height;
1239        // Everything up to (excluding) next_height is in confirmed_log.
1240        let (confirmed_heights, unconfirmed_heights) = heights
1241            .into_iter()
1242            .partition::<Vec<_>, _>(|height| *height < next_height);
1243        let confirmed_indices = confirmed_heights
1244            .into_iter()
1245            .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1246            .collect::<Result<_, _>>()?;
1247        let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1248        // Everything after (including) next_height in preprocessed_blocks if we have it.
1249        let unconfirmed_hashes = self
1250            .preprocessed_blocks
1251            .multi_get(&unconfirmed_heights)
1252            .await?;
1253        Ok(confirmed_hashes
1254            .into_iter()
1255            .chain(unconfirmed_hashes)
1256            .flatten()
1257            .collect())
1258    }
1259
1260    /// Resets the chain manager for the next block height.
1261    async fn reset_chain_manager(
1262        &mut self,
1263        next_height: BlockHeight,
1264        local_time: Timestamp,
1265    ) -> Result<(), ChainError> {
1266        let maybe_committee = self.execution_state.system.current_committee().await?;
1267        let ownership = self.execution_state.system.ownership.get().await?.clone();
1268        let fallback_owners = maybe_committee
1269            .iter()
1270            .flat_map(|(_, committee)| committee.account_keys_and_weights());
1271        self.pending_validated_blobs.clear();
1272        self.pending_proposed_blobs.clear();
1273        self.manager
1274            .reset(ownership, next_height, local_time, fallback_owners)
1275    }
1276
1277    /// Updates the outboxes with the messages sent in the block.
1278    ///
1279    /// Returns the set of all recipients.
1280    #[instrument(skip_all, fields(
1281        chain_id = %self.chain_id(),
1282        block_height = %block.header.height
1283    ))]
1284    async fn process_outgoing_messages(
1285        &mut self,
1286        block: &Block,
1287    ) -> Result<Vec<ChainId>, ChainError> {
1288        // Record the messages of the execution. Messages are understood within an
1289        // application.
1290        let recipients = block.recipients();
1291        let block_height = block.header.height;
1292        let next_height = self.tip_state.get().next_block_height;
1293
1294        // Update the outboxes.
1295        let outbox_counters = self.outbox_counters.get_mut();
1296        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1297        let targets = recipients.into_iter().collect::<Vec<_>>();
1298        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1299        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1300            if block_height > next_height {
1301                // There may be a gap in the chain before this block. We can only add it to this
1302                // outbox if the previous message to the same recipient has already been added.
1303                if *outbox.next_height_to_schedule.get() > block_height {
1304                    continue; // We already added this recipient's messages to the outbox.
1305                }
1306                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1307                {
1308                    // The block with the last added message has already been executed; look up its
1309                    // hash in the confirmed_log.
1310                    Some(height) if height < next_height => {
1311                        let index =
1312                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1313                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1314                            ChainError::CorruptedChainState("missing entry in confirmed_log".into())
1315                        })?)
1316                    }
1317                    // The block with last added message has not been executed yet. If we have it,
1318                    // it's in preprocessed_blocks.
1319                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1320                        || {
1321                            ChainError::CorruptedChainState(
1322                                "missing entry in preprocessed_blocks".into(),
1323                            )
1324                        },
1325                    )?),
1326                    None => None, // No message to that sender was added yet.
1327                };
1328                // Only schedule if this block contains the next message for that recipient.
1329                match (
1330                    maybe_prev_hash,
1331                    block.body.previous_message_blocks.get(target),
1332                ) {
1333                    (None, None) => {
1334                        // No previous message block expected and none indicated by the outbox -
1335                        // all good
1336                    }
1337                    (Some(_), None) => {
1338                        // Outbox indicates there was a previous message block, but
1339                        // previous_message_blocks has no idea about it - possible bug
1340                        return Err(ChainError::CorruptedChainState(
1341                            "block indicates no previous message block,\
1342                            but we have one in the outbox"
1343                                .into(),
1344                        ));
1345                    }
1346                    (None, Some((_, prev_msg_block_height))) => {
1347                        // We have no previously processed block in the outbox, but we are
1348                        // expecting one - this could be due to an empty outbox having been pruned.
1349                        // Only process the outbox if the height of the previous message block is
1350                        // lower than the tip
1351                        if *prev_msg_block_height >= next_height {
1352                            continue;
1353                        }
1354                    }
1355                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1356                        // Only process the outbox if the hashes match.
1357                        if prev_hash != prev_msg_block_hash {
1358                            continue;
1359                        }
1360                    }
1361                }
1362            }
1363            if outbox.schedule_message(block_height)? {
1364                *outbox_counters.entry(block_height).or_default() += 1;
1365                nonempty_outboxes.insert(*target);
1366            }
1367            #[cfg(with_metrics)]
1368            crate::outbox::metrics::OUTBOX_SIZE
1369                .with_label_values(&[])
1370                .observe(outbox.queue.count() as f64);
1371        }
1372
1373        #[cfg(with_metrics)]
1374        metrics::NUM_OUTBOXES
1375            .with_label_values(&[])
1376            .observe(nonempty_outboxes.len() as f64);
1377        Ok(targets)
1378    }
1379}
1380
1381#[test]
1382fn empty_block_size() {
1383    let size = bcs::serialized_size(&crate::block::Block::new(
1384        crate::test::make_first_block(
1385            linera_execution::test_utils::dummy_chain_description(0).id(),
1386        ),
1387        crate::data_types::BlockExecutionOutcome::default(),
1388    ))
1389    .unwrap();
1390    assert_eq!(size, EMPTY_BLOCK_SIZE);
1391}