linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6    sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11    crypto::{CryptoHash, ValidatorPublicKey},
12    data_types::{
13        ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14        OracleResponse, Timestamp,
15    },
16    ensure,
17    identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18    ownership::ChainOwnership,
19};
20use linera_execution::{
21    committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, Operation,
22    OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController, ResourceTracker,
23    ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26    bucket_queue_view::BucketQueueView,
27    context::Context,
28    log_view::LogView,
29    map_view::MapView,
30    reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31    register_view::RegisterView,
32    set_view::SetView,
33    views::{ClonableView, RootView, View},
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37
38use crate::{
39    block::{Block, ConfirmedBlock},
40    block_tracker::BlockExecutionTracker,
41    data_types::{
42        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
43        Transaction,
44    },
45    inbox::{Cursor, InboxError, InboxStateView},
46    manager::ChainManager,
47    outbox::OutboxStateView,
48    pending_blobs::PendingBlobsView,
49    ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61    use std::sync::LazyLock;
62
63    use linera_base::prometheus_util::{
64        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65        register_int_counter_vec,
66    };
67    use linera_execution::ResourceTracker;
68    use prometheus::{HistogramVec, IntCounterVec};
69
70    pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72    });
73
74    pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75        register_histogram_vec(
76            "block_execution_latency",
77            "Block execution latency",
78            &[],
79            exponential_bucket_latencies(1000.0),
80        )
81    });
82
83    #[cfg(with_metrics)]
84    pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85        register_histogram_vec(
86            "message_execution_latency",
87            "Message execution latency",
88            &[],
89            exponential_bucket_latencies(50.0),
90        )
91    });
92
93    pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94        register_histogram_vec(
95            "operation_execution_latency",
96            "Operation execution latency",
97            &[],
98            exponential_bucket_latencies(50.0),
99        )
100    });
101
102    pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103        register_histogram_vec(
104            "wasm_fuel_used_per_block",
105            "Wasm fuel used per block",
106            &[],
107            exponential_bucket_interval(10.0, 1_000_000.0),
108        )
109    });
110
111    pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112        register_histogram_vec(
113            "evm_fuel_used_per_block",
114            "EVM fuel used per block",
115            &[],
116            exponential_bucket_interval(10.0, 1_000_000.0),
117        )
118    });
119
120    pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "vm_num_reads_per_block",
123            "VM number of reads per block",
124            &[],
125            exponential_bucket_interval(0.1, 100.0),
126        )
127    });
128
129    pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130        register_histogram_vec(
131            "vm_bytes_read_per_block",
132            "VM number of bytes read per block",
133            &[],
134            exponential_bucket_interval(0.1, 10_000_000.0),
135        )
136    });
137
138    pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139        register_histogram_vec(
140            "vm_bytes_written_per_block",
141            "VM number of bytes written per block",
142            &[],
143            exponential_bucket_interval(0.1, 10_000_000.0),
144        )
145    });
146
147    pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148        register_histogram_vec(
149            "state_hash_computation_latency",
150            "Time to recompute the state hash",
151            &[],
152            exponential_bucket_latencies(500.0),
153        )
154    });
155
156    pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157        register_histogram_vec(
158            "num_inboxes",
159            "Number of inboxes",
160            &[],
161            exponential_bucket_interval(1.0, 10_000.0),
162        )
163    });
164
165    pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166        register_histogram_vec(
167            "num_outboxes",
168            "Number of outboxes",
169            &[],
170            exponential_bucket_interval(1.0, 10_000.0),
171        )
172    });
173
174    /// Tracks block execution metrics in Prometheus.
175    pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177        WASM_FUEL_USED_PER_BLOCK
178            .with_label_values(&[])
179            .observe(tracker.wasm_fuel as f64);
180        EVM_FUEL_USED_PER_BLOCK
181            .with_label_values(&[])
182            .observe(tracker.evm_fuel as f64);
183        VM_NUM_READS_PER_BLOCK
184            .with_label_values(&[])
185            .observe(tracker.read_operations as f64);
186        VM_BYTES_READ_PER_BLOCK
187            .with_label_values(&[])
188            .observe(tracker.bytes_read as f64);
189        VM_BYTES_WRITTEN_PER_BLOCK
190            .with_label_values(&[])
191            .observe(tracker.bytes_written as f64);
192    }
193}
194
195/// The BCS-serialized size of an empty [`Block`].
196pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
199#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
201pub struct TimestampedBundleInInbox {
202    /// The origin and cursor of the bundle.
203    pub entry: BundleInInbox,
204    /// The timestamp when the bundle was added to the inbox.
205    pub seen: Timestamp,
206}
207
208/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
209#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Allocative)]
211pub struct BundleInInbox {
212    /// The origin from which we received the bundle.
213    pub origin: ChainId,
214    /// The cursor of the bundle in the inbox.
215    pub cursor: Cursor,
216}
217
218// The `TimestampedBundleInInbox` is a relatively small type, so a total
219// of 100 seems reasonable for the storing of the data.
220const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
221
222/// A view accessing the state of a chain.
223#[cfg_attr(
224    with_graphql,
225    derive(async_graphql::SimpleObject),
226    graphql(cache_control(no_cache))
227)]
228#[derive(Debug, RootView, ClonableView, Allocative)]
229#[allocative(bound = "C")]
230pub struct ChainStateView<C>
231where
232    C: Clone + Context + 'static,
233{
234    /// Execution state, including system and user applications.
235    pub execution_state: ExecutionStateView<C>,
236    /// Hash of the execution state.
237    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
238
239    /// Block-chaining state.
240    pub tip_state: RegisterView<C, ChainTipState>,
241
242    /// Consensus state.
243    pub manager: ChainManager<C>,
244    /// Pending validated block that is still missing blobs.
245    /// The incomplete set of blobs for the pending validated block.
246    pub pending_validated_blobs: PendingBlobsView<C>,
247    /// The incomplete sets of blobs for upcoming proposals.
248    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
249
250    /// Hashes of all certified blocks for this sender.
251    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
252    pub confirmed_log: LogView<C, CryptoHash>,
253    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
254    pub received_log: LogView<C, ChainAndHeight>,
255    /// The number of `received_log` entries we have synchronized, for each validator.
256    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
257
258    /// Mailboxes used to receive messages indexed by their origin.
259    pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
260    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
261    pub unskippable_bundles:
262        BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
263    /// Unskippable bundles that have been removed but are still in the queue.
264    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
265    /// The heights of previous blocks that sent messages to the same recipients.
266    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
267    /// The heights of previous blocks that published events to the same streams.
268    pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
269    /// Mailboxes used to send messages, indexed by their target.
270    pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
271    /// Number of outgoing messages in flight for each block height.
272    /// We use a `RegisterView` to prioritize speed for small maps.
273    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
274    /// Outboxes with at least one pending message. This allows us to avoid loading all outboxes.
275    pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
276
277    /// Blocks that have been verified but not executed yet, and that may not be contiguous.
278    pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
279}
280
281/// Block-chaining state.
282#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
283#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
284pub struct ChainTipState {
285    /// Hash of the latest certified block in this chain, if any.
286    pub block_hash: Option<CryptoHash>,
287    /// Sequence number tracking blocks.
288    pub next_block_height: BlockHeight,
289    /// Number of incoming message bundles.
290    pub num_incoming_bundles: u32,
291    /// Number of operations.
292    pub num_operations: u32,
293    /// Number of outgoing messages.
294    pub num_outgoing_messages: u32,
295}
296
297impl ChainTipState {
298    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
299    /// expected parent.
300    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
301        ensure!(
302            new_block.height == self.next_block_height,
303            ChainError::UnexpectedBlockHeight {
304                expected_block_height: self.next_block_height,
305                found_block_height: new_block.height
306            }
307        );
308        ensure!(
309            new_block.previous_block_hash == self.block_hash,
310            ChainError::UnexpectedPreviousBlockHash
311        );
312        Ok(())
313    }
314
315    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
316    /// it is higher than the tip.
317    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
318        ensure!(
319            self.next_block_height >= height,
320            ChainError::MissingEarlierBlocks {
321                current_block_height: self.next_block_height,
322            }
323        );
324        Ok(self.next_block_height > height)
325    }
326
327    /// Checks if the measurement counters would be valid.
328    pub fn update_counters(
329        &mut self,
330        transactions: &[Transaction],
331        messages: &[Vec<OutgoingMessage>],
332    ) -> Result<(), ChainError> {
333        let mut num_incoming_bundles = 0u32;
334        let mut num_operations = 0u32;
335
336        for transaction in transactions {
337            match transaction {
338                Transaction::ReceiveMessages(_) => {
339                    num_incoming_bundles = num_incoming_bundles
340                        .checked_add(1)
341                        .ok_or(ArithmeticError::Overflow)?;
342                }
343                Transaction::ExecuteOperation(_) => {
344                    num_operations = num_operations
345                        .checked_add(1)
346                        .ok_or(ArithmeticError::Overflow)?;
347                }
348            }
349        }
350
351        self.num_incoming_bundles = self
352            .num_incoming_bundles
353            .checked_add(num_incoming_bundles)
354            .ok_or(ArithmeticError::Overflow)?;
355
356        self.num_operations = self
357            .num_operations
358            .checked_add(num_operations)
359            .ok_or(ArithmeticError::Overflow)?;
360
361        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
362            .map_err(|_| ArithmeticError::Overflow)?;
363        self.num_outgoing_messages = self
364            .num_outgoing_messages
365            .checked_add(num_outgoing_messages)
366            .ok_or(ArithmeticError::Overflow)?;
367
368        Ok(())
369    }
370}
371
372impl<C> ChainStateView<C>
373where
374    C: Context + Clone + 'static,
375    C::Extra: ExecutionRuntimeContext,
376{
377    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
378    pub fn chain_id(&self) -> ChainId {
379        self.context().extra().chain_id()
380    }
381
382    #[instrument(skip_all, fields(
383        chain_id = %self.chain_id(),
384    ))]
385    pub async fn query_application(
386        &mut self,
387        local_time: Timestamp,
388        query: Query,
389        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
390    ) -> Result<QueryOutcome, ChainError> {
391        let context = QueryContext {
392            chain_id: self.chain_id(),
393            next_block_height: self.tip_state.get().next_block_height,
394            local_time,
395        };
396        self.execution_state
397            .query_application(context, query, service_runtime_endpoint)
398            .await
399            .with_execution_context(ChainExecutionContext::Query)
400    }
401
402    #[instrument(skip_all, fields(
403        chain_id = %self.chain_id(),
404        application_id = %application_id
405    ))]
406    pub async fn describe_application(
407        &mut self,
408        application_id: ApplicationId,
409    ) -> Result<ApplicationDescription, ChainError> {
410        self.execution_state
411            .system
412            .describe_application(application_id, &mut TransactionTracker::default())
413            .await
414            .with_execution_context(ChainExecutionContext::DescribeApplication)
415    }
416
417    #[instrument(skip_all, fields(
418        chain_id = %self.chain_id(),
419        target = %target,
420        height = %height
421    ))]
422    pub async fn mark_messages_as_received(
423        &mut self,
424        target: &ChainId,
425        height: BlockHeight,
426    ) -> Result<bool, ChainError> {
427        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
428        let updates = outbox.mark_messages_as_received(height).await?;
429        if updates.is_empty() {
430            return Ok(false);
431        }
432        for update in updates {
433            let counter = self
434                .outbox_counters
435                .get_mut()
436                .get_mut(&update)
437                .ok_or_else(|| {
438                    ChainError::InternalError("message counter should be present".into())
439                })?;
440            *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
441            if *counter == 0 {
442                // Important for the test in `all_messages_delivered_up_to`.
443                self.outbox_counters.get_mut().remove(&update);
444            }
445        }
446        if outbox.queue.count() == 0 {
447            self.nonempty_outboxes.get_mut().remove(target);
448            // If the outbox is empty and not ahead of the executed blocks, remove it.
449            if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
450                self.outboxes.remove_entry(target)?;
451            }
452        }
453        #[cfg(with_metrics)]
454        metrics::NUM_OUTBOXES
455            .with_label_values(&[])
456            .observe(self.outboxes.count().await? as f64);
457        Ok(true)
458    }
459
460    /// Returns true if there are no more outgoing messages in flight up to the given
461    /// block height.
462    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
463        tracing::debug!(
464            "Messages left in {:.8}'s outbox: {:?}",
465            self.chain_id(),
466            self.outbox_counters.get()
467        );
468        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
469            key > &height
470        } else {
471            true
472        }
473    }
474
475    /// Invariant for the states of active chains.
476    pub fn is_active(&self) -> bool {
477        self.execution_state.system.is_active()
478    }
479
480    /// Initializes the chain if it is not active yet.
481    pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
482        // Initialize ourselves.
483        if self
484            .execution_state
485            .system
486            .initialize_chain(self.chain_id())
487            .await
488            .with_execution_context(ChainExecutionContext::Block)?
489        {
490            // The chain was already initialized.
491            return Ok(());
492        }
493        // Recompute the state hash.
494        let hash = self.execution_state.crypto_hash_mut().await?;
495        self.execution_state_hash.set(Some(hash));
496        let maybe_committee = self.execution_state.system.current_committee().into_iter();
497        // Last, reset the consensus state based on the current ownership.
498        self.manager.reset(
499            self.execution_state.system.ownership.get().clone(),
500            BlockHeight(0),
501            local_time,
502            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
503        )?;
504        Ok(())
505    }
506
507    pub async fn next_block_height_to_receive(
508        &self,
509        origin: &ChainId,
510    ) -> Result<BlockHeight, ChainError> {
511        let inbox = self.inboxes.try_load_entry(origin).await?;
512        match inbox {
513            Some(inbox) => inbox.next_block_height_to_receive(),
514            None => Ok(BlockHeight::ZERO),
515        }
516    }
517
518    /// Returns the height of the highest block we have, plus one. Includes preprocessed blocks.
519    ///
520    /// The "+ 1" is so that it can be used in the same places as `next_block_height`.
521    pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
522        if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
523            return Ok(height.saturating_add(BlockHeight(1)));
524        }
525        Ok(self.tip_state.get().next_block_height)
526    }
527
528    pub async fn last_anticipated_block_height(
529        &self,
530        origin: &ChainId,
531    ) -> Result<Option<BlockHeight>, ChainError> {
532        let inbox = self.inboxes.try_load_entry(origin).await?;
533        match inbox {
534            Some(inbox) => match inbox.removed_bundles.back().await? {
535                Some(bundle) => Ok(Some(bundle.height)),
536                None => Ok(None),
537            },
538            None => Ok(None),
539        }
540    }
541
542    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
543    /// internal error if the bundle doesn't appear to be new, based on the sender's
544    /// height. The value `local_time` is specific to each validator and only used for
545    /// round timeouts.
546    ///
547    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
548    #[instrument(skip_all, fields(
549        chain_id = %self.chain_id(),
550        origin = %origin,
551        bundle_height = %bundle.height
552    ))]
553    pub async fn receive_message_bundle(
554        &mut self,
555        origin: &ChainId,
556        bundle: MessageBundle,
557        local_time: Timestamp,
558        add_to_received_log: bool,
559    ) -> Result<(), ChainError> {
560        assert!(!bundle.messages.is_empty());
561        let chain_id = self.chain_id();
562        tracing::trace!(
563            "Processing new messages to {chain_id:.8} from {origin} at height {}",
564            bundle.height,
565        );
566        let chain_and_height = ChainAndHeight {
567            chain_id: *origin,
568            height: bundle.height,
569        };
570
571        match self.initialize_if_needed(local_time).await {
572            Ok(_) => (),
573            // if the only issue was that we couldn't initialize the chain because of a
574            // missing chain description blob, we might still want to update the inbox
575            Err(ChainError::ExecutionError(exec_err, _))
576                if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
577                if blobs.iter().all(|blob_id| {
578                    blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
579                })) => {}
580            err => {
581                return err;
582            }
583        }
584
585        // Process the inbox bundle and update the inbox state.
586        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
587        #[cfg(with_metrics)]
588        metrics::NUM_INBOXES
589            .with_label_values(&[])
590            .observe(self.inboxes.count().await? as f64);
591        inbox
592            .add_bundle(bundle)
593            .await
594            .map_err(|error| match error {
595                InboxError::ViewError(error) => ChainError::ViewError(error),
596                error => ChainError::InternalError(format!(
597                    "while processing messages in certified block: {error}"
598                )),
599            })?;
600
601        // Remember the certificate for future validator/client synchronizations.
602        if add_to_received_log {
603            self.received_log.push(chain_and_height);
604        }
605        Ok(())
606    }
607
608    /// Updates the `received_log` trackers.
609    pub fn update_received_certificate_trackers(
610        &mut self,
611        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
612    ) {
613        for (name, tracker) in new_trackers {
614            self.received_certificate_trackers
615                .get_mut()
616                .entry(name)
617                .and_modify(|t| {
618                    // Because several synchronizations could happen in parallel, we need to make
619                    // sure to never go backward.
620                    if tracker > *t {
621                        *t = tracker;
622                    }
623                })
624                .or_insert(tracker);
625        }
626    }
627
628    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
629        self.execution_state
630            .system
631            .current_committee()
632            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
633    }
634
635    pub fn ownership(&self) -> &ChainOwnership {
636        self.execution_state.system.ownership.get()
637    }
638
639    /// Removes the incoming message bundles in the block from the inboxes.
640    ///
641    /// If `must_be_present` is `true`, an error is returned if any of the bundles have not been
642    /// added to the inbox yet. So this should be `true` if the bundles are in a block _proposal_,
643    /// and `false` if the block is already confirmed.
644    #[instrument(skip_all, fields(
645        chain_id = %self.chain_id(),
646    ))]
647    pub async fn remove_bundles_from_inboxes(
648        &mut self,
649        timestamp: Timestamp,
650        must_be_present: bool,
651        incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
652    ) -> Result<(), ChainError> {
653        let chain_id = self.chain_id();
654        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
655        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
656            ensure!(
657                bundle.timestamp <= timestamp,
658                ChainError::IncorrectBundleTimestamp {
659                    chain_id,
660                    bundle_timestamp: bundle.timestamp,
661                    block_timestamp: timestamp,
662                }
663            );
664            let bundles = bundles_by_origin.entry(*origin).or_default();
665            bundles.push(bundle);
666        }
667        let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
668        let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
669        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
670            tracing::trace!(
671                "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
672                bundles
673                    .iter()
674                    .map(|bundle| bundle.height.to_string())
675                    .collect::<Vec<_>>()
676                    .join(", ")
677            );
678            for bundle in bundles {
679                // Mark the message as processed in the inbox.
680                let was_present = inbox
681                    .remove_bundle(bundle)
682                    .await
683                    .map_err(|error| (chain_id, origin, error))?;
684                if must_be_present {
685                    ensure!(
686                        was_present,
687                        ChainError::MissingCrossChainUpdate {
688                            chain_id,
689                            origin,
690                            height: bundle.height,
691                        }
692                    );
693                }
694            }
695        }
696        #[cfg(with_metrics)]
697        metrics::NUM_INBOXES
698            .with_label_values(&[])
699            .observe(self.inboxes.count().await? as f64);
700        Ok(())
701    }
702
703    /// Returns the chain IDs of all recipients for which a message is waiting in the outbox.
704    pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
705        self.nonempty_outboxes.get().iter().copied().collect()
706    }
707
708    /// Returns the outboxes for the given targets, or an error if any of them are missing.
709    pub async fn load_outboxes(
710        &self,
711        targets: &[ChainId],
712    ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
713        let vec_of_options = self.outboxes.try_load_entries(targets).await?;
714        let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
715        optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
716    }
717
718    /// Executes a block: first the incoming messages, then the main operation.
719    /// Does not update chain state other than the execution state.
720    #[instrument(skip_all, fields(
721        chain_id = %block.chain_id,
722        block_height = %block.height
723    ))]
724    #[expect(clippy::too_many_arguments)]
725    async fn execute_block_inner(
726        chain: &mut ExecutionStateView<C>,
727        confirmed_log: &LogView<C, CryptoHash>,
728        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
729        previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
730        block: &ProposedBlock,
731        local_time: Timestamp,
732        round: Option<u32>,
733        published_blobs: &[Blob],
734        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
735    ) -> Result<BlockExecutionOutcome, ChainError> {
736        #[cfg(with_metrics)]
737        let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
738        chain.system.timestamp.set(block.timestamp);
739
740        let policy = chain
741            .system
742            .current_committee()
743            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
744            .1
745            .policy()
746            .clone();
747
748        let mut resource_controller = ResourceController::new(
749            Arc::new(policy),
750            ResourceTracker::default(),
751            block.authenticated_signer,
752        );
753
754        for blob in published_blobs {
755            let blob_id = blob.id();
756            resource_controller
757                .policy()
758                .check_blob_size(blob.content())
759                .with_execution_context(ChainExecutionContext::Block)?;
760            chain.system.used_blobs.insert(&blob_id)?;
761        }
762
763        // Execute each incoming bundle as a transaction, then each operation.
764        // Collect messages, events and oracle responses, each as one list per transaction.
765        let mut block_execution_tracker = BlockExecutionTracker::new(
766            &mut resource_controller,
767            published_blobs
768                .iter()
769                .map(|blob| (blob.id(), blob))
770                .collect(),
771            local_time,
772            replaying_oracle_responses,
773            block,
774        )?;
775
776        for transaction in block.transaction_refs() {
777            block_execution_tracker
778                .execute_transaction(transaction, round, chain)
779                .await?;
780        }
781
782        let recipients = block_execution_tracker.recipients();
783        let heights = previous_message_blocks_view.multi_get(&recipients).await?;
784        let mut recipient_heights = Vec::new();
785        let mut indices = Vec::new();
786        for (height, recipient) in heights.into_iter().zip(recipients) {
787            if let Some(height) = height {
788                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
789                indices.push(index);
790                recipient_heights.push((recipient, height));
791            }
792        }
793        let hashes = confirmed_log.multi_get(indices).await?;
794        let mut previous_message_blocks = BTreeMap::new();
795        for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
796            let hash = hash.ok_or_else(|| {
797                ChainError::InternalError("missing entry in confirmed_log".into())
798            })?;
799            previous_message_blocks.insert(recipient, (hash, height));
800        }
801
802        let streams = block_execution_tracker.event_streams();
803        let heights = previous_event_blocks_view.multi_get(&streams).await?;
804        let mut stream_heights = Vec::new();
805        let mut indices = Vec::new();
806        for (stream, height) in streams.into_iter().zip(heights) {
807            if let Some(height) = height {
808                let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
809                indices.push(index);
810                stream_heights.push((stream, height));
811            }
812        }
813        let hashes = confirmed_log.multi_get(indices).await?;
814        let mut previous_event_blocks = BTreeMap::new();
815        for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
816            let hash = hash.ok_or_else(|| {
817                ChainError::InternalError("missing entry in confirmed_log".into())
818            })?;
819            previous_event_blocks.insert(stream, (hash, height));
820        }
821
822        let state_hash = {
823            #[cfg(with_metrics)]
824            let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
825            chain.crypto_hash_mut().await?
826        };
827
828        let (messages, oracle_responses, events, blobs, operation_results) =
829            block_execution_tracker.finalize();
830
831        Ok(BlockExecutionOutcome {
832            messages,
833            previous_message_blocks,
834            previous_event_blocks,
835            state_hash,
836            oracle_responses,
837            events,
838            blobs,
839            operation_results,
840        })
841    }
842
843    /// Executes a block: first the incoming messages, then the main operation.
844    /// Does not update chain state other than the execution state.
845    #[instrument(skip_all, fields(
846        chain_id = %self.chain_id(),
847        block_height = %block.height
848    ))]
849    pub async fn execute_block(
850        &mut self,
851        block: &ProposedBlock,
852        local_time: Timestamp,
853        round: Option<u32>,
854        published_blobs: &[Blob],
855        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
856    ) -> Result<BlockExecutionOutcome, ChainError> {
857        assert_eq!(
858            block.chain_id,
859            self.execution_state.context().extra().chain_id()
860        );
861
862        self.initialize_if_needed(local_time).await?;
863
864        let chain_timestamp = *self.execution_state.system.timestamp.get();
865        ensure!(
866            chain_timestamp <= block.timestamp,
867            ChainError::InvalidBlockTimestamp {
868                parent: chain_timestamp,
869                new: block.timestamp
870            }
871        );
872        ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
873
874        ensure!(
875            block.published_blob_ids()
876                == published_blobs
877                    .iter()
878                    .map(|blob| blob.id())
879                    .collect::<BTreeSet<_>>(),
880            ChainError::InternalError("published_blobs mismatch".to_string())
881        );
882
883        if *self.execution_state.system.closed.get() {
884            ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
885        }
886
887        Self::check_app_permissions(
888            self.execution_state.system.application_permissions.get(),
889            block,
890        )?;
891
892        Self::execute_block_inner(
893            &mut self.execution_state,
894            &self.confirmed_log,
895            &self.previous_message_blocks,
896            &self.previous_event_blocks,
897            block,
898            local_time,
899            round,
900            published_blobs,
901            replaying_oracle_responses,
902        )
903        .await
904    }
905
906    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
907    /// manager. This does not touch the execution state itself, which must be updated separately.
908    /// Returns the set of event streams that were updated as a result of applying the block.
909    #[instrument(skip_all, fields(
910        chain_id = %self.chain_id(),
911        block_height = %block.inner().inner().header.height
912    ))]
913    pub async fn apply_confirmed_block(
914        &mut self,
915        block: &ConfirmedBlock,
916        local_time: Timestamp,
917    ) -> Result<(), ChainError> {
918        let hash = block.inner().hash();
919        let block = block.inner().inner();
920        self.execution_state_hash.set(Some(block.header.state_hash));
921        let recipients = self.process_outgoing_messages(block).await?;
922
923        for recipient in recipients {
924            self.previous_message_blocks
925                .insert(&recipient, block.header.height)?;
926        }
927        for event in block.body.events.iter().flatten() {
928            self.previous_event_blocks
929                .insert(&event.stream_id, block.header.height)?;
930        }
931        // Last, reset the consensus state based on the current ownership.
932        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
933
934        // Advance to next block height.
935        let tip = self.tip_state.get_mut();
936        tip.block_hash = Some(hash);
937        tip.next_block_height.try_add_assign_one()?;
938        tip.update_counters(&block.body.transactions, &block.body.messages)?;
939        self.confirmed_log.push(hash);
940        self.preprocessed_blocks.remove(&block.header.height)?;
941        Ok(())
942    }
943
944    /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
945    #[instrument(skip_all, fields(
946        chain_id = %self.chain_id(),
947        block_height = %block.inner().inner().header.height
948    ))]
949    pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
950        let hash = block.inner().hash();
951        let block = block.inner().inner();
952        let height = block.header.height;
953        if height < self.tip_state.get().next_block_height {
954            return Ok(());
955        }
956        self.process_outgoing_messages(block).await?;
957        self.preprocessed_blocks.insert(&height, hash)?;
958        Ok(())
959    }
960
961    /// Returns whether this is a child chain.
962    pub fn is_child(&self) -> bool {
963        let Some(description) = self.execution_state.system.description.get() else {
964            // Root chains are always initialized, so this must be a child chain.
965            return true;
966        };
967        description.is_child()
968    }
969
970    /// Verifies that the block is valid according to the chain's application permission settings.
971    #[instrument(skip_all, fields(
972        block_height = %block.height,
973        num_transactions = %block.transactions.len()
974    ))]
975    fn check_app_permissions(
976        app_permissions: &ApplicationPermissions,
977        block: &ProposedBlock,
978    ) -> Result<(), ChainError> {
979        let mut mandatory = HashSet::<ApplicationId>::from_iter(
980            app_permissions.mandatory_applications.iter().copied(),
981        );
982        for transaction in &block.transactions {
983            match transaction {
984                Transaction::ExecuteOperation(operation)
985                    if operation.is_exempt_from_permissions() =>
986                {
987                    mandatory.clear()
988                }
989                Transaction::ExecuteOperation(operation) => {
990                    ensure!(
991                        app_permissions.can_execute_operations(&operation.application_id()),
992                        ChainError::AuthorizedApplications(
993                            app_permissions.execute_operations.clone().unwrap()
994                        )
995                    );
996                    if let Operation::User { application_id, .. } = operation {
997                        mandatory.remove(application_id);
998                    }
999                }
1000                Transaction::ReceiveMessages(incoming_bundle) => {
1001                    for pending in incoming_bundle.messages() {
1002                        if let Message::User { application_id, .. } = &pending.message {
1003                            mandatory.remove(application_id);
1004                        }
1005                    }
1006                }
1007            }
1008        }
1009        ensure!(
1010            mandatory.is_empty(),
1011            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1012        );
1013        Ok(())
1014    }
1015
1016    /// Returns the hashes of all blocks we have in the given range.
1017    ///
1018    /// If the input heights are in ascending order, the hashes will be in the same order.
1019    /// Otherwise they may be unordered.
1020    #[instrument(skip_all, fields(
1021        chain_id = %self.chain_id(),
1022        next_block_height = %self.tip_state.get().next_block_height,
1023    ))]
1024    pub async fn block_hashes(
1025        &self,
1026        heights: impl IntoIterator<Item = BlockHeight>,
1027    ) -> Result<Vec<CryptoHash>, ChainError> {
1028        let next_height = self.tip_state.get().next_block_height;
1029        // Everything up to (excluding) next_height is in confirmed_log.
1030        let (confirmed_heights, unconfirmed_heights) = heights
1031            .into_iter()
1032            .partition::<Vec<_>, _>(|height| *height < next_height);
1033        let confirmed_indices = confirmed_heights
1034            .into_iter()
1035            .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1036            .collect::<Result<_, _>>()?;
1037        let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1038        // Everything after (including) next_height in preprocessed_blocks if we have it.
1039        let unconfirmed_hashes = self
1040            .preprocessed_blocks
1041            .multi_get(&unconfirmed_heights)
1042            .await?;
1043        Ok(confirmed_hashes
1044            .into_iter()
1045            .chain(unconfirmed_hashes)
1046            .flatten()
1047            .collect())
1048    }
1049
1050    /// Resets the chain manager for the next block height.
1051    fn reset_chain_manager(
1052        &mut self,
1053        next_height: BlockHeight,
1054        local_time: Timestamp,
1055    ) -> Result<(), ChainError> {
1056        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1057        let ownership = self.execution_state.system.ownership.get().clone();
1058        let fallback_owners =
1059            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1060        self.pending_validated_blobs.clear();
1061        self.pending_proposed_blobs.clear();
1062        self.manager
1063            .reset(ownership, next_height, local_time, fallback_owners)
1064    }
1065
1066    /// Updates the outboxes with the messages sent in the block.
1067    ///
1068    /// Returns the set of all recipients.
1069    #[instrument(skip_all, fields(
1070        chain_id = %self.chain_id(),
1071        block_height = %block.header.height
1072    ))]
1073    async fn process_outgoing_messages(
1074        &mut self,
1075        block: &Block,
1076    ) -> Result<Vec<ChainId>, ChainError> {
1077        // Record the messages of the execution. Messages are understood within an
1078        // application.
1079        let recipients = block.recipients();
1080        let block_height = block.header.height;
1081        let next_height = self.tip_state.get().next_block_height;
1082
1083        // Update the outboxes.
1084        let outbox_counters = self.outbox_counters.get_mut();
1085        let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1086        let targets = recipients.into_iter().collect::<Vec<_>>();
1087        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1088        for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1089            if block_height > next_height {
1090                // There may be a gap in the chain before this block. We can only add it to this
1091                // outbox if the previous message to the same recipient has already been added.
1092                if *outbox.next_height_to_schedule.get() > block_height {
1093                    continue; // We already added this recipient's messages to the outbox.
1094                }
1095                let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1096                {
1097                    // The block with the last added message has already been executed; look up its
1098                    // hash in the confirmed_log.
1099                    Some(height) if height < next_height => {
1100                        let index =
1101                            usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1102                        Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1103                            ChainError::InternalError("missing entry in confirmed_log".into())
1104                        })?)
1105                    }
1106                    // The block with last added message has not been executed yet. If we have it,
1107                    // it's in preprocessed_blocks.
1108                    Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1109                        || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1110                    )?),
1111                    None => None, // No message to that sender was added yet.
1112                };
1113                // Only schedule if this block contains the next message for that recipient.
1114                match (
1115                    maybe_prev_hash,
1116                    block.body.previous_message_blocks.get(target),
1117                ) {
1118                    (None, None) => {
1119                        // No previous message block expected and none indicated by the outbox -
1120                        // all good
1121                    }
1122                    (Some(_), None) => {
1123                        // Outbox indicates there was a previous message block, but
1124                        // previous_message_blocks has no idea about it - possible bug
1125                        return Err(ChainError::InternalError(
1126                            "block indicates no previous message block,\
1127                            but we have one in the outbox"
1128                                .into(),
1129                        ));
1130                    }
1131                    (None, Some((_, prev_msg_block_height))) => {
1132                        // We have no previously processed block in the outbox, but we are
1133                        // expecting one - this could be due to an empty outbox having been pruned.
1134                        // Only process the outbox if the height of the previous message block is
1135                        // lower than the tip
1136                        if *prev_msg_block_height >= next_height {
1137                            continue;
1138                        }
1139                    }
1140                    (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1141                        // Only process the outbox if the hashes match.
1142                        if prev_hash != prev_msg_block_hash {
1143                            continue;
1144                        }
1145                    }
1146                }
1147            }
1148            if outbox.schedule_message(block_height)? {
1149                *outbox_counters.entry(block_height).or_default() += 1;
1150                nonempty_outboxes.insert(*target);
1151            }
1152        }
1153
1154        #[cfg(with_metrics)]
1155        metrics::NUM_OUTBOXES
1156            .with_label_values(&[])
1157            .observe(self.outboxes.count().await? as f64);
1158        Ok(targets)
1159    }
1160}
1161
1162#[test]
1163fn empty_block_size() {
1164    let size = bcs::serialized_size(&crate::block::Block::new(
1165        crate::test::make_first_block(
1166            linera_execution::test_utils::dummy_chain_description(0).id(),
1167        ),
1168        crate::data_types::BlockExecutionOutcome::default(),
1169    ))
1170    .unwrap();
1171    assert_eq!(size, EMPTY_BLOCK_SIZE);
1172}