linera_chain/
chain.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(with_metrics)]
5use std::sync::LazyLock;
6use std::{
7    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8    sync::Arc,
9};
10
11use async_graphql::SimpleObject;
12use futures::stream::{self, StreamExt, TryStreamExt};
13use linera_base::{
14    crypto::{CryptoHash, ValidatorPublicKey},
15    data_types::{
16        Amount, ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
17        OracleResponse, Timestamp,
18    },
19    ensure,
20    identifiers::{
21        AccountOwner, ApplicationId, BlobType, ChainId, ChannelFullName, Destination,
22        GenericApplicationId, MessageId,
23    },
24    ownership::ChainOwnership,
25};
26use linera_execution::{
27    committee::{Committee, Epoch},
28    system::OpenChainConfig,
29    ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation,
30    OperationContext, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
31    ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
32};
33use linera_views::{
34    context::Context,
35    log_view::LogView,
36    map_view::MapView,
37    queue_view::QueueView,
38    reentrant_collection_view::ReentrantCollectionView,
39    register_view::RegisterView,
40    set_view::SetView,
41    views::{ClonableView, CryptoHashView, RootView, View},
42};
43use serde::{Deserialize, Serialize};
44
45use crate::{
46    block::{Block, ConfirmedBlock},
47    data_types::{
48        BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageAction, MessageBundle,
49        OperationResult, Origin, PostedMessage, ProposedBlock, Target, Transaction,
50    },
51    inbox::{Cursor, InboxError, InboxStateView},
52    manager::ChainManager,
53    outbox::OutboxStateView,
54    pending_blobs::PendingBlobsView,
55    ChainError, ChainExecutionContext, ExecutionResultExt,
56};
57
58#[cfg(test)]
59#[path = "unit_tests/chain_tests.rs"]
60mod chain_tests;
61
62#[cfg(with_metrics)]
63use {
64    linera_base::prometheus_util::{
65        exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66        register_int_counter_vec, MeasureLatency,
67    },
68    prometheus::{HistogramVec, IntCounterVec},
69};
70
71#[cfg(with_metrics)]
72static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
73    register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
74});
75
76#[cfg(with_metrics)]
77static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
78    register_histogram_vec(
79        "block_execution_latency",
80        "Block execution latency",
81        &[],
82        exponential_bucket_latencies(1000.0),
83    )
84});
85
86#[cfg(with_metrics)]
87static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
88    register_histogram_vec(
89        "message_execution_latency",
90        "Message execution latency",
91        &[],
92        exponential_bucket_latencies(50.0),
93    )
94});
95
96#[cfg(with_metrics)]
97static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
98    register_histogram_vec(
99        "operation_execution_latency",
100        "Operation execution latency",
101        &[],
102        exponential_bucket_latencies(50.0),
103    )
104});
105
106#[cfg(with_metrics)]
107static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
108    register_histogram_vec(
109        "wasm_fuel_used_per_block",
110        "Wasm fuel used per block",
111        &[],
112        exponential_bucket_interval(10.0, 1_000_000.0),
113    )
114});
115
116#[cfg(with_metrics)]
117static WASM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
118    register_histogram_vec(
119        "wasm_num_reads_per_block",
120        "Wasm number of reads per block",
121        &[],
122        exponential_bucket_interval(0.1, 100.0),
123    )
124});
125
126#[cfg(with_metrics)]
127static WASM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128    register_histogram_vec(
129        "wasm_bytes_read_per_block",
130        "Wasm number of bytes read per block",
131        &[],
132        exponential_bucket_interval(0.1, 10_000_000.0),
133    )
134});
135
136#[cfg(with_metrics)]
137static WASM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
138    register_histogram_vec(
139        "wasm_bytes_written_per_block",
140        "Wasm number of bytes written per block",
141        &[],
142        exponential_bucket_interval(0.1, 10_000_000.0),
143    )
144});
145
146#[cfg(with_metrics)]
147static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148    register_histogram_vec(
149        "state_hash_computation_latency",
150        "Time to recompute the state hash",
151        &[],
152        exponential_bucket_latencies(10.0),
153    )
154});
155
156#[cfg(with_metrics)]
157static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158    register_histogram_vec(
159        "num_inboxes",
160        "Number of inboxes",
161        &[],
162        exponential_bucket_interval(1.0, 10_000.0),
163    )
164});
165
166#[cfg(with_metrics)]
167static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
168    register_histogram_vec(
169        "num_outboxes",
170        "Number of outboxes",
171        &[],
172        exponential_bucket_interval(1.0, 10_000.0),
173    )
174});
175
176/// The BCS-serialized size of an empty [`Block`].
177const EMPTY_BLOCK_SIZE: usize = 94;
178
179/// An origin, cursor and timestamp of a unskippable bundle in our inbox.
180#[derive(Debug, Clone, Serialize, Deserialize, async_graphql::SimpleObject)]
181pub struct TimestampedBundleInInbox {
182    /// The origin and cursor of the bundle.
183    pub entry: BundleInInbox,
184    /// The timestamp when the bundle was added to the inbox.
185    pub seen: Timestamp,
186}
187
188/// An origin and cursor of a unskippable bundle that is no longer in our inbox.
189#[derive(
190    Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, async_graphql::SimpleObject,
191)]
192pub struct BundleInInbox {
193    /// The origin from which we received the bundle.
194    pub origin: Origin,
195    /// The cursor of the bundle in the inbox.
196    pub cursor: Cursor,
197}
198
199impl BundleInInbox {
200    fn new(origin: Origin, bundle: &MessageBundle) -> Self {
201        BundleInInbox {
202            cursor: Cursor::from(bundle),
203            origin,
204        }
205    }
206}
207
208/// A view accessing the state of a chain.
209#[derive(Debug, RootView, ClonableView, SimpleObject)]
210#[graphql(cache_control(no_cache))]
211pub struct ChainStateView<C>
212where
213    C: Clone + Context + Send + Sync + 'static,
214{
215    /// Execution state, including system and user applications.
216    pub execution_state: ExecutionStateView<C>,
217    /// Hash of the execution state.
218    pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
219
220    /// Block-chaining state.
221    pub tip_state: RegisterView<C, ChainTipState>,
222
223    /// Consensus state.
224    pub manager: ChainManager<C>,
225    /// Pending validated block that is still missing blobs.
226    /// The incomplete set of blobs for the pending validated block.
227    pub pending_validated_blobs: PendingBlobsView<C>,
228    /// The incomplete sets of blobs for upcoming proposals.
229    pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
230
231    /// Hashes of all certified blocks for this sender.
232    /// This ends with `block_hash` and has length `usize::from(next_block_height)`.
233    pub confirmed_log: LogView<C, CryptoHash>,
234    /// Sender chain and height of all certified blocks known as a receiver (local ordering).
235    pub received_log: LogView<C, ChainAndHeight>,
236    /// The number of `received_log` entries we have synchronized, for each validator.
237    pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
238
239    /// Mailboxes used to receive messages indexed by their origin.
240    pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
241    /// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
242    pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
243    /// Unskippable bundles that have been removed but are still in the queue.
244    pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
245    /// The heights of previous blocks that sent messages to the same recipients.
246    pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
247    /// Mailboxes used to send messages, indexed by their target.
248    pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
249    /// Number of outgoing messages in flight for each block height.
250    /// We use a `RegisterView` to prioritize speed for small maps.
251    pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
252    /// Channels able to multicast messages to subscribers.
253    pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
254}
255
256/// Block-chaining state.
257#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
258pub struct ChainTipState {
259    /// Hash of the latest certified block in this chain, if any.
260    pub block_hash: Option<CryptoHash>,
261    /// Sequence number tracking blocks.
262    pub next_block_height: BlockHeight,
263    /// Number of incoming message bundles.
264    pub num_incoming_bundles: u32,
265    /// Number of operations.
266    pub num_operations: u32,
267    /// Number of outgoing messages.
268    pub num_outgoing_messages: u32,
269}
270
271impl ChainTipState {
272    /// Checks that the proposed block is suitable, i.e. at the expected height and with the
273    /// expected parent.
274    pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
275        ensure!(
276            new_block.height == self.next_block_height,
277            ChainError::UnexpectedBlockHeight {
278                expected_block_height: self.next_block_height,
279                found_block_height: new_block.height
280            }
281        );
282        ensure!(
283            new_block.previous_block_hash == self.block_hash,
284            ChainError::UnexpectedPreviousBlockHash
285        );
286        Ok(())
287    }
288
289    /// Returns `true` if the validated block's height is below the tip height. Returns an error if
290    /// it is higher than the tip.
291    pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
292        ensure!(
293            self.next_block_height >= height,
294            ChainError::MissingEarlierBlocks {
295                current_block_height: self.next_block_height,
296            }
297        );
298        Ok(self.next_block_height > height)
299    }
300
301    /// Returns `true` if the next block will be the first, i.e. the chain doesn't have any blocks.
302    pub fn is_first_block(&self) -> bool {
303        self.next_block_height == BlockHeight::ZERO
304    }
305
306    /// Checks if the measurement counters would be valid.
307    pub fn update_counters(
308        &mut self,
309        incoming_bundles: &[IncomingBundle],
310        operations: &[Operation],
311        messages: &[Vec<OutgoingMessage>],
312    ) -> Result<(), ChainError> {
313        let num_incoming_bundles =
314            u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
315        self.num_incoming_bundles = self
316            .num_incoming_bundles
317            .checked_add(num_incoming_bundles)
318            .ok_or(ArithmeticError::Overflow)?;
319
320        let num_operations =
321            u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
322        self.num_operations = self
323            .num_operations
324            .checked_add(num_operations)
325            .ok_or(ArithmeticError::Overflow)?;
326
327        let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
328            .map_err(|_| ArithmeticError::Overflow)?;
329        self.num_outgoing_messages = self
330            .num_outgoing_messages
331            .checked_add(num_outgoing_messages)
332            .ok_or(ArithmeticError::Overflow)?;
333
334        Ok(())
335    }
336}
337
338/// The state of a channel followed by subscribers.
339#[derive(Debug, ClonableView, View, SimpleObject)]
340pub struct ChannelStateView<C>
341where
342    C: Context + Send + Sync,
343{
344    /// The current subscribers.
345    pub subscribers: SetView<C, ChainId>,
346    /// The block heights so far, to be sent to future subscribers.
347    pub block_heights: LogView<C, BlockHeight>,
348}
349
350impl<C> ChainStateView<C>
351where
352    C: Context + Clone + Send + Sync + 'static,
353    C::Extra: ExecutionRuntimeContext,
354{
355    /// Returns the [`ChainId`] of the chain this [`ChainStateView`] represents.
356    pub fn chain_id(&self) -> ChainId {
357        self.context().extra().chain_id()
358    }
359
360    pub async fn query_application(
361        &mut self,
362        local_time: Timestamp,
363        query: Query,
364        service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
365    ) -> Result<QueryOutcome, ChainError> {
366        let context = QueryContext {
367            chain_id: self.chain_id(),
368            next_block_height: self.tip_state.get().next_block_height,
369            local_time,
370        };
371        self.execution_state
372            .query_application(context, query, service_runtime_endpoint)
373            .await
374            .with_execution_context(ChainExecutionContext::Query)
375    }
376
377    pub async fn describe_application(
378        &mut self,
379        application_id: ApplicationId,
380    ) -> Result<ApplicationDescription, ChainError> {
381        self.execution_state
382            .system
383            .describe_application(application_id, None)
384            .await
385            .with_execution_context(ChainExecutionContext::DescribeApplication)
386    }
387
388    pub async fn mark_messages_as_received(
389        &mut self,
390        target: &Target,
391        height: BlockHeight,
392    ) -> Result<bool, ChainError> {
393        let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
394        let updates = outbox.mark_messages_as_received(height).await?;
395        if updates.is_empty() {
396            return Ok(false);
397        }
398        for update in updates {
399            let counter = self
400                .outbox_counters
401                .get_mut()
402                .get_mut(&update)
403                .expect("message counter should be present");
404            *counter = counter
405                .checked_sub(1)
406                .expect("message counter should not underflow");
407            if *counter == 0 {
408                // Important for the test in `all_messages_delivered_up_to`.
409                self.outbox_counters.get_mut().remove(&update);
410            }
411        }
412        if outbox.queue.count() == 0 {
413            self.outboxes.remove_entry(target)?;
414        }
415        #[cfg(with_metrics)]
416        NUM_OUTBOXES
417            .with_label_values(&[])
418            .observe(self.outboxes.count().await? as f64);
419        Ok(true)
420    }
421
422    /// Returns true if there are no more outgoing messages in flight up to the given
423    /// block height.
424    pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
425        tracing::debug!(
426            "Messages left in {:.8}'s outbox: {:?}",
427            self.chain_id(),
428            self.outbox_counters.get()
429        );
430        if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
431            key > &height
432        } else {
433            true
434        }
435    }
436
437    /// Invariant for the states of active chains.
438    pub fn is_active(&self) -> bool {
439        self.execution_state.system.is_active()
440    }
441
442    /// Invariant for the states of active chains.
443    pub fn ensure_is_active(&self) -> Result<(), ChainError> {
444        if self.is_active() {
445            Ok(())
446        } else {
447            Err(ChainError::InactiveChain(self.chain_id()))
448        }
449    }
450
451    /// Verifies that this chain is up-to-date and all the messages executed ahead of time
452    /// have been properly received by now.
453    pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
454        let chain_id = self.chain_id();
455        let pairs = self.inboxes.try_load_all_entries().await?;
456        let max_stream_queries = self.context().max_stream_queries();
457        let stream = stream::iter(pairs)
458            .map(|(origin, inbox)| async move {
459                if let Some(bundle) = inbox.removed_bundles.front().await? {
460                    return Err(ChainError::MissingCrossChainUpdate {
461                        chain_id,
462                        origin: origin.into(),
463                        height: bundle.height,
464                    });
465                }
466                Ok::<(), ChainError>(())
467            })
468            .buffer_unordered(max_stream_queries);
469        stream.try_collect::<Vec<_>>().await?;
470        Ok(())
471    }
472
473    pub async fn next_block_height_to_receive(
474        &self,
475        origin: &Origin,
476    ) -> Result<BlockHeight, ChainError> {
477        let inbox = self.inboxes.try_load_entry(origin).await?;
478        match inbox {
479            Some(inbox) => inbox.next_block_height_to_receive(),
480            None => Ok(BlockHeight::from(0)),
481        }
482    }
483
484    pub async fn last_anticipated_block_height(
485        &self,
486        origin: &Origin,
487    ) -> Result<Option<BlockHeight>, ChainError> {
488        let inbox = self.inboxes.try_load_entry(origin).await?;
489        match inbox {
490            Some(inbox) => match inbox.removed_bundles.back().await? {
491                Some(bundle) => Ok(Some(bundle.height)),
492                None => Ok(None),
493            },
494            None => Ok(None),
495        }
496    }
497
498    /// Attempts to process a new `bundle` of messages from the given `origin`. Returns an
499    /// internal error if the bundle doesn't appear to be new, based on the sender's
500    /// height. The value `local_time` is specific to each validator and only used for
501    /// round timeouts.
502    ///
503    /// Returns `true` if incoming `Subscribe` messages created new outbox entries.
504    pub async fn receive_message_bundle(
505        &mut self,
506        origin: &Origin,
507        bundle: MessageBundle,
508        local_time: Timestamp,
509        add_to_received_log: bool,
510    ) -> Result<(), ChainError> {
511        assert!(!bundle.messages.is_empty());
512        let chain_id = self.chain_id();
513        tracing::trace!(
514            "Processing new messages to {chain_id:.8} from {origin} at height {}",
515            bundle.height,
516        );
517        let chain_and_height = ChainAndHeight {
518            chain_id: origin.sender,
519            height: bundle.height,
520        };
521
522        // Handle immediate messages.
523        for posted_message in &bundle.messages {
524            if let Some(config) = posted_message.message.matches_open_chain() {
525                if self.execution_state.system.description.get().is_none() {
526                    let message_id = chain_and_height.to_message_id(posted_message.index);
527                    self.execute_init_message(message_id, config, bundle.timestamp, local_time)
528                        .await?;
529                }
530            }
531        }
532        // Process the inbox bundle and update the inbox state.
533        let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
534        #[cfg(with_metrics)]
535        NUM_INBOXES
536            .with_label_values(&[])
537            .observe(self.inboxes.count().await? as f64);
538        let entry = BundleInInbox::new(origin.clone(), &bundle);
539        let skippable = bundle.is_skippable();
540        let newly_added = inbox
541            .add_bundle(bundle)
542            .await
543            .map_err(|error| match error {
544                InboxError::ViewError(error) => ChainError::ViewError(error),
545                error => ChainError::InternalError(format!(
546                    "while processing messages in certified block: {error}"
547                )),
548            })?;
549        if newly_added && !skippable {
550            let seen = local_time;
551            self.unskippable_bundles
552                .push_back(TimestampedBundleInInbox { entry, seen });
553        }
554
555        // Remember the certificate for future validator/client synchronizations.
556        if add_to_received_log {
557            self.received_log.push(chain_and_height);
558        }
559        Ok(())
560    }
561
562    /// Updates the `received_log` trackers.
563    pub fn update_received_certificate_trackers(
564        &mut self,
565        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
566    ) {
567        for (name, tracker) in new_trackers {
568            self.received_certificate_trackers
569                .get_mut()
570                .entry(name)
571                .and_modify(|t| {
572                    // Because several synchronizations could happen in parallel, we need to make
573                    // sure to never go backward.
574                    if tracker > *t {
575                        *t = tracker;
576                    }
577                })
578                .or_insert(tracker);
579        }
580    }
581
582    /// Verifies that the block's first message is `OpenChain`. Initializes the chain if necessary.
583    pub async fn execute_init_message_from(
584        &mut self,
585        block: &Block,
586        local_time: Timestamp,
587    ) -> Result<(), ChainError> {
588        let (in_bundle, posted_message, config) = block
589            .starts_with_open_chain_message()
590            .ok_or_else(|| ChainError::InactiveChain(block.header.chain_id))?;
591        if self.is_active() {
592            return Ok(()); // Already initialized.
593        }
594        let message_id = MessageId {
595            chain_id: in_bundle.origin.sender,
596            height: in_bundle.bundle.height,
597            index: posted_message.index,
598        };
599        self.execute_init_message(message_id, config, block.header.timestamp, local_time)
600            .await
601    }
602
603    /// Initializes the chain using the given configuration.
604    async fn execute_init_message(
605        &mut self,
606        message_id: MessageId,
607        config: &OpenChainConfig,
608        timestamp: Timestamp,
609        local_time: Timestamp,
610    ) -> Result<(), ChainError> {
611        // Initialize ourself.
612        self.execution_state
613            .system
614            .initialize_chain(message_id, timestamp, config.clone());
615        // Recompute the state hash.
616        let hash = self.execution_state.crypto_hash().await?;
617        self.execution_state_hash.set(Some(hash));
618        let maybe_committee = self.execution_state.system.current_committee().into_iter();
619        // Last, reset the consensus state based on the current ownership.
620        self.manager.reset(
621            self.execution_state.system.ownership.get().clone(),
622            BlockHeight(0),
623            local_time,
624            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
625        )
626    }
627
628    pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
629        self.execution_state
630            .system
631            .current_committee()
632            .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
633    }
634
635    pub fn ownership(&self) -> &ChainOwnership {
636        self.execution_state.system.ownership.get()
637    }
638
639    /// Removes the incoming message bundles in the block from the inboxes.
640    pub async fn remove_bundles_from_inboxes(
641        &mut self,
642        timestamp: Timestamp,
643        incoming_bundles: &[IncomingBundle],
644    ) -> Result<(), ChainError> {
645        let chain_id = self.chain_id();
646        let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
647        for IncomingBundle { bundle, origin, .. } in incoming_bundles {
648            ensure!(
649                bundle.timestamp <= timestamp,
650                ChainError::IncorrectBundleTimestamp {
651                    chain_id,
652                    bundle_timestamp: bundle.timestamp,
653                    block_timestamp: timestamp,
654                }
655            );
656            let bundles = bundles_by_origin.entry(origin).or_default();
657            bundles.push(bundle);
658        }
659        let origins = bundles_by_origin.keys().copied();
660        let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
661        let mut removed_unskippable = HashSet::new();
662        for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
663            tracing::trace!(
664                "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
665                bundles
666                    .iter()
667                    .map(|bundle| bundle.height)
668                    .collect::<Vec<_>>()
669            );
670            for bundle in bundles {
671                // Mark the message as processed in the inbox.
672                let was_present = inbox
673                    .remove_bundle(bundle)
674                    .await
675                    .map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
676                if was_present && !bundle.is_skippable() {
677                    removed_unskippable.insert(BundleInInbox::new(origin.clone(), bundle));
678                }
679            }
680        }
681        if !removed_unskippable.is_empty() {
682            // Delete all removed bundles from the front of the unskippable queue.
683            let maybe_front = self.unskippable_bundles.front().await?;
684            if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
685                self.unskippable_bundles.delete_front();
686                while let Some(ts_entry) = self.unskippable_bundles.front().await? {
687                    if !removed_unskippable.remove(&ts_entry.entry) {
688                        if !self
689                            .removed_unskippable_bundles
690                            .contains(&ts_entry.entry)
691                            .await?
692                        {
693                            break;
694                        }
695                        self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
696                    }
697                    self.unskippable_bundles.delete_front();
698                }
699            }
700            for entry in removed_unskippable {
701                self.removed_unskippable_bundles.insert(&entry)?;
702            }
703        }
704        #[cfg(with_metrics)]
705        NUM_INBOXES
706            .with_label_values(&[])
707            .observe(self.inboxes.count().await? as f64);
708        Ok(())
709    }
710
711    /// Executes a block: first the incoming messages, then the main operation.
712    /// Does not update chain state other than the execution state.
713    #[expect(clippy::too_many_arguments)]
714    pub async fn execute_block_inner(
715        chain: &mut ExecutionStateView<C>,
716        confirmed_log: &LogView<C, CryptoHash>,
717        previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
718        block: &ProposedBlock,
719        local_time: Timestamp,
720        round: Option<u32>,
721        published_blobs: &[Blob],
722        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
723    ) -> Result<
724        (
725            BlockExecutionOutcome,
726            Vec<(ChannelFullName, ChainId)>,
727            Vec<(ChannelFullName, ChainId)>,
728        ),
729        ChainError,
730    > {
731        #[cfg(with_metrics)]
732        let _execution_latency = BLOCK_EXECUTION_LATENCY.measure_latency();
733
734        assert_eq!(block.chain_id, chain.context().extra().chain_id());
735
736        ensure!(
737            *chain.system.timestamp.get() <= block.timestamp,
738            ChainError::InvalidBlockTimestamp
739        );
740        chain.system.timestamp.set(block.timestamp);
741        let (_, committee) = chain
742            .system
743            .current_committee()
744            .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?;
745        let mut resource_controller = ResourceController {
746            policy: Arc::new(committee.policy().clone()),
747            tracker: ResourceTracker::default(),
748            account: block.authenticated_signer,
749        };
750        ensure!(
751            block.published_blob_ids()
752                == published_blobs
753                    .iter()
754                    .map(|blob| blob.id())
755                    .collect::<BTreeSet<_>>(),
756            ChainError::InternalError("published_blobs mismatch".to_string())
757        );
758        resource_controller
759            .track_block_size(EMPTY_BLOCK_SIZE)
760            .with_execution_context(ChainExecutionContext::Block)?;
761        for blob in published_blobs {
762            let blob_type = blob.content().blob_type();
763            if blob_type == BlobType::Data
764                || blob_type == BlobType::ContractBytecode
765                || blob_type == BlobType::ServiceBytecode
766                || blob_type == BlobType::EvmBytecode
767            {
768                resource_controller
769                    .with_state(&mut chain.system)
770                    .await?
771                    .track_blob_published(blob.content())
772                    .with_execution_context(ChainExecutionContext::Block)?;
773            }
774            chain.system.used_blobs.insert(&blob.id())?;
775        }
776
777        if *chain.system.closed.get() {
778            ensure!(
779                !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
780                ChainError::ClosedChain
781            );
782        }
783        Self::check_app_permissions(chain.system.application_permissions.get(), block)?;
784
785        // Execute each incoming bundle as a transaction, then each operation.
786        // Collect messages, events and oracle responses, each as one list per transaction.
787        let mut replaying_oracle_responses = replaying_oracle_responses.map(Vec::into_iter);
788        let mut next_message_index = 0;
789        let mut next_application_index = 0;
790        let mut oracle_responses = Vec::new();
791        let mut events = Vec::new();
792        let mut blobs = Vec::new();
793        let mut messages = Vec::new();
794        let mut operation_results = Vec::new();
795        let mut subscribe = Vec::new();
796        let mut unsubscribe = Vec::new();
797        for (txn_index, transaction) in block.transactions() {
798            let chain_execution_context = match transaction {
799                Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
800                Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
801            };
802            let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
803                Some(Some(responses)) => Some(responses),
804                Some(None) => return Err(ChainError::MissingOracleResponseList),
805                None => None,
806            };
807            let mut txn_tracker = TransactionTracker::new(
808                local_time,
809                txn_index,
810                next_message_index,
811                next_application_index,
812                maybe_responses,
813            );
814            match transaction {
815                Transaction::ReceiveMessages(incoming_bundle) => {
816                    resource_controller
817                        .track_block_size_of(&incoming_bundle)
818                        .with_execution_context(chain_execution_context)?;
819                    for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
820                        Box::pin(Self::execute_message_in_block(
821                            chain,
822                            message_id,
823                            posted_message,
824                            incoming_bundle,
825                            block,
826                            round,
827                            &mut txn_tracker,
828                            &mut resource_controller,
829                        ))
830                        .await?;
831                    }
832                }
833                Transaction::ExecuteOperation(operation) => {
834                    resource_controller
835                        .track_block_size_of(&operation)
836                        .with_execution_context(chain_execution_context)?;
837                    #[cfg(with_metrics)]
838                    let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
839                    let context = OperationContext {
840                        chain_id: block.chain_id,
841                        height: block.height,
842                        index: Some(txn_index),
843                        round,
844                        authenticated_signer: block.authenticated_signer,
845                        authenticated_caller_id: None,
846                    };
847                    Box::pin(chain.execute_operation(
848                        context,
849                        operation.clone(),
850                        &mut txn_tracker,
851                        &mut resource_controller,
852                    ))
853                    .await
854                    .with_execution_context(chain_execution_context)?;
855                    resource_controller
856                        .with_state(&mut chain.system)
857                        .await?
858                        .track_operation(operation)
859                        .with_execution_context(chain_execution_context)?;
860                }
861            }
862
863            let txn_outcome = txn_tracker
864                .into_outcome()
865                .with_execution_context(chain_execution_context)?;
866            next_message_index = txn_outcome.next_message_index;
867            next_application_index = txn_outcome.next_application_index;
868
869            subscribe.extend(txn_outcome.subscribe);
870            unsubscribe.extend(txn_outcome.unsubscribe);
871
872            if matches!(
873                transaction,
874                Transaction::ExecuteOperation(_)
875                    | Transaction::ReceiveMessages(IncomingBundle {
876                        action: MessageAction::Accept,
877                        ..
878                    })
879            ) {
880                for message_out in &txn_outcome.outgoing_messages {
881                    resource_controller
882                        .with_state(&mut chain.system)
883                        .await?
884                        .track_message(&message_out.message)
885                        .with_execution_context(chain_execution_context)?;
886                }
887            }
888
889            resource_controller
890                .track_block_size_of(&(
891                    &txn_outcome.oracle_responses,
892                    &txn_outcome.outgoing_messages,
893                    &txn_outcome.events,
894                    &txn_outcome.blobs,
895                ))
896                .with_execution_context(chain_execution_context)?;
897            for blob in &txn_outcome.blobs {
898                if blob.content().blob_type() == BlobType::Data {
899                    resource_controller
900                        .with_state(&mut chain.system)
901                        .await?
902                        .track_blob_published(blob.content())
903                        .with_execution_context(chain_execution_context)?;
904                }
905            }
906            oracle_responses.push(txn_outcome.oracle_responses);
907            messages.push(txn_outcome.outgoing_messages);
908            events.push(txn_outcome.events);
909            blobs.push(txn_outcome.blobs);
910
911            if let Transaction::ExecuteOperation(_) = transaction {
912                resource_controller
913                    .track_block_size_of(&(&txn_outcome.operation_result))
914                    .with_execution_context(chain_execution_context)?;
915                operation_results.push(OperationResult(txn_outcome.operation_result));
916            }
917        }
918
919        // Finally, charge for the block fee, except if the chain is closed. Closed chains should
920        // always be able to reject incoming messages.
921        if !chain.system.closed.get() {
922            resource_controller
923                .with_state(&mut chain.system)
924                .await?
925                .track_block()
926                .with_execution_context(ChainExecutionContext::Block)?;
927        }
928
929        let recipients = messages
930            .iter()
931            .flatten()
932            .flat_map(|message| message.destination.recipient())
933            .collect::<BTreeSet<_>>();
934        let mut previous_message_blocks = BTreeMap::new();
935        for recipient in recipients {
936            if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
937                let hash = confirmed_log
938                    .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
939                    .await?
940                    .ok_or_else(|| {
941                        ChainError::InternalError("missing entry in confirmed_log".into())
942                    })?;
943                previous_message_blocks.insert(recipient, hash);
944            }
945        }
946
947        let txn_count = block.incoming_bundles.len() + block.operations.len();
948        assert_eq!(oracle_responses.len(), txn_count);
949        assert_eq!(messages.len(), txn_count);
950        assert_eq!(events.len(), txn_count);
951        assert_eq!(blobs.len(), txn_count);
952
953        #[cfg(with_metrics)]
954        Self::track_block_metrics(&resource_controller.tracker);
955
956        let state_hash = {
957            #[cfg(with_metrics)]
958            let _hash_latency = STATE_HASH_COMPUTATION_LATENCY.measure_latency();
959            chain.crypto_hash().await?
960        };
961
962        let outcome = BlockExecutionOutcome {
963            messages,
964            previous_message_blocks,
965            state_hash,
966            oracle_responses,
967            events,
968            blobs,
969            operation_results,
970        };
971
972        Ok((outcome, subscribe, unsubscribe))
973    }
974
975    /// Executes a block: first the incoming messages, then the main operation.
976    /// Does not update chain state other than the execution state.
977    pub async fn execute_block(
978        &mut self,
979        block: &ProposedBlock,
980        local_time: Timestamp,
981        round: Option<u32>,
982        published_blobs: &[Blob],
983        replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
984    ) -> Result<
985        (
986            BlockExecutionOutcome,
987            Vec<(ChannelFullName, ChainId)>,
988            Vec<(ChannelFullName, ChainId)>,
989        ),
990        ChainError,
991    > {
992        Self::execute_block_inner(
993            &mut self.execution_state,
994            &self.confirmed_log,
995            &self.previous_message_blocks,
996            block,
997            local_time,
998            round,
999            published_blobs,
1000            replaying_oracle_responses,
1001        )
1002        .await
1003    }
1004
1005    /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
1006    /// manager. This does not touch the execution state itself, which must be updated separately.
1007    pub async fn apply_confirmed_block(
1008        &mut self,
1009        block: &ConfirmedBlock,
1010        local_time: Timestamp,
1011    ) -> Result<(), ChainError> {
1012        let hash = block.inner().hash();
1013        let block = block.inner().inner();
1014        self.execution_state_hash.set(Some(block.header.state_hash));
1015        for txn_messages in &block.body.messages {
1016            self.process_outgoing_messages(block.header.height, txn_messages)
1017                .await?;
1018        }
1019
1020        let recipients = block
1021            .body
1022            .messages
1023            .iter()
1024            .flatten()
1025            .flat_map(|message| message.destination.recipient())
1026            .collect::<BTreeSet<_>>();
1027        for recipient in recipients {
1028            self.previous_message_blocks
1029                .insert(&recipient, block.header.height)?;
1030        }
1031        // Last, reset the consensus state based on the current ownership.
1032        self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
1033
1034        // Advance to next block height.
1035        let tip = self.tip_state.get_mut();
1036        tip.block_hash = Some(hash);
1037        tip.next_block_height.try_add_assign_one()?;
1038        tip.update_counters(
1039            &block.body.incoming_bundles,
1040            &block.body.operations,
1041            &block.body.messages,
1042        )?;
1043        self.confirmed_log.push(hash);
1044        Ok(())
1045    }
1046
1047    /// Executes a message as part of an incoming bundle in a block.
1048    #[expect(clippy::too_many_arguments)]
1049    async fn execute_message_in_block(
1050        chain: &mut ExecutionStateView<C>,
1051        message_id: MessageId,
1052        posted_message: &PostedMessage,
1053        incoming_bundle: &IncomingBundle,
1054        block: &ProposedBlock,
1055        round: Option<u32>,
1056        txn_tracker: &mut TransactionTracker,
1057        resource_controller: &mut ResourceController<Option<AccountOwner>>,
1058    ) -> Result<(), ChainError> {
1059        #[cfg(with_metrics)]
1060        let _message_latency = MESSAGE_EXECUTION_LATENCY.measure_latency();
1061        let context = MessageContext {
1062            chain_id: block.chain_id,
1063            is_bouncing: posted_message.is_bouncing(),
1064            height: block.height,
1065            round,
1066            certificate_hash: incoming_bundle.bundle.certificate_hash,
1067            message_id,
1068            authenticated_signer: posted_message.authenticated_signer,
1069            refund_grant_to: posted_message.refund_grant_to,
1070        };
1071        let mut grant = posted_message.grant;
1072        match incoming_bundle.action {
1073            MessageAction::Accept => {
1074                let chain_execution_context =
1075                    ChainExecutionContext::IncomingBundle(txn_tracker.transaction_index());
1076                // Once a chain is closed, accepting incoming messages is not allowed.
1077                ensure!(!chain.system.closed.get(), ChainError::ClosedChain);
1078
1079                Box::pin(chain.execute_message(
1080                    context,
1081                    posted_message.message.clone(),
1082                    (grant > Amount::ZERO).then_some(&mut grant),
1083                    txn_tracker,
1084                    resource_controller,
1085                ))
1086                .await
1087                .with_execution_context(chain_execution_context)?;
1088                chain
1089                    .send_refund(context, grant, txn_tracker)
1090                    .await
1091                    .with_execution_context(chain_execution_context)?;
1092            }
1093            MessageAction::Reject => {
1094                // If rejecting a message fails, the entire block proposal should be
1095                // scrapped.
1096                ensure!(
1097                    !posted_message.is_protected() || *chain.system.closed.get(),
1098                    ChainError::CannotRejectMessage {
1099                        chain_id: block.chain_id,
1100                        origin: Box::new(incoming_bundle.origin.clone()),
1101                        posted_message: Box::new(posted_message.clone()),
1102                    }
1103                );
1104                if posted_message.is_tracked() {
1105                    // Bounce the message.
1106                    chain
1107                        .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1108                        .await
1109                        .with_execution_context(ChainExecutionContext::Block)?;
1110                } else {
1111                    // Nothing to do except maybe refund the grant.
1112                    chain
1113                        .send_refund(context, grant, txn_tracker)
1114                        .await
1115                        .with_execution_context(ChainExecutionContext::Block)?;
1116                }
1117            }
1118        }
1119        Ok(())
1120    }
1121
1122    /// Returns whether this is a child chain.
1123    pub fn is_child(&self) -> bool {
1124        let Some(description) = self.execution_state.system.description.get() else {
1125            // Root chains are always initialized, so this must be a child chain.
1126            return true;
1127        };
1128        description.is_child()
1129    }
1130
1131    /// Verifies that the block is valid according to the chain's application permission settings.
1132    fn check_app_permissions(
1133        app_permissions: &ApplicationPermissions,
1134        block: &ProposedBlock,
1135    ) -> Result<(), ChainError> {
1136        let mut mandatory = HashSet::<ApplicationId>::from_iter(
1137            app_permissions.mandatory_applications.iter().cloned(),
1138        );
1139        for operation in &block.operations {
1140            ensure!(
1141                app_permissions.can_execute_operations(&operation.application_id()),
1142                ChainError::AuthorizedApplications(
1143                    app_permissions.execute_operations.clone().unwrap()
1144                )
1145            );
1146            if let Operation::User { application_id, .. } = operation {
1147                mandatory.remove(application_id);
1148            }
1149        }
1150        for pending in block.incoming_messages() {
1151            if mandatory.is_empty() {
1152                break;
1153            }
1154            if let Message::User { application_id, .. } = &pending.message {
1155                mandatory.remove(application_id);
1156            }
1157        }
1158        ensure!(
1159            mandatory.is_empty(),
1160            ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1161        );
1162        Ok(())
1163    }
1164
1165    /// Resets the chain manager for the next block height.
1166    fn reset_chain_manager(
1167        &mut self,
1168        next_height: BlockHeight,
1169        local_time: Timestamp,
1170    ) -> Result<(), ChainError> {
1171        let maybe_committee = self.execution_state.system.current_committee().into_iter();
1172        let ownership = self.execution_state.system.ownership.get().clone();
1173        let fallback_owners =
1174            maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1175        self.pending_validated_blobs.clear();
1176        self.pending_proposed_blobs.clear();
1177        self.manager
1178            .reset(ownership, next_height, local_time, fallback_owners)
1179    }
1180
1181    /// Tracks block execution metrics in Prometheus.
1182    #[cfg(with_metrics)]
1183    fn track_block_metrics(tracker: &ResourceTracker) {
1184        NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
1185        WASM_FUEL_USED_PER_BLOCK
1186            .with_label_values(&[])
1187            .observe(tracker.fuel as f64);
1188        WASM_NUM_READS_PER_BLOCK
1189            .with_label_values(&[])
1190            .observe(tracker.read_operations as f64);
1191        WASM_BYTES_READ_PER_BLOCK
1192            .with_label_values(&[])
1193            .observe(tracker.bytes_read as f64);
1194        WASM_BYTES_WRITTEN_PER_BLOCK
1195            .with_label_values(&[])
1196            .observe(tracker.bytes_written as f64);
1197    }
1198
1199    async fn process_outgoing_messages(
1200        &mut self,
1201        height: BlockHeight,
1202        messages: &[OutgoingMessage],
1203    ) -> Result<(), ChainError> {
1204        let max_stream_queries = self.context().max_stream_queries();
1205        // Record the messages of the execution. Messages are understood within an
1206        // application.
1207        let mut recipients = HashSet::new();
1208        let mut channel_broadcasts = HashSet::new();
1209        for message in messages {
1210            match &message.destination {
1211                Destination::Recipient(id) => {
1212                    recipients.insert(*id);
1213                }
1214                Destination::Subscribers(name) => {
1215                    ensure!(
1216                        message.grant == Amount::ZERO,
1217                        ChainError::GrantUseOnBroadcast
1218                    );
1219                    let GenericApplicationId::User(application_id) =
1220                        message.message.application_id()
1221                    else {
1222                        return Err(ChainError::InternalError(
1223                            "System messages cannot be sent to channels".to_string(),
1224                        ));
1225                    };
1226                    channel_broadcasts.insert(ChannelFullName {
1227                        application_id,
1228                        name: name.clone(),
1229                    });
1230                }
1231            }
1232        }
1233
1234        // Update the (regular) outboxes.
1235        let outbox_counters = self.outbox_counters.get_mut();
1236        let targets = recipients
1237            .into_iter()
1238            .map(Target::chain)
1239            .collect::<Vec<_>>();
1240        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1241        for mut outbox in outboxes {
1242            if outbox.schedule_message(height)? {
1243                *outbox_counters.entry(height).or_default() += 1;
1244            }
1245        }
1246
1247        let full_names = channel_broadcasts.into_iter().collect::<Vec<_>>();
1248        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1249        let stream = full_names.into_iter().zip(channels);
1250        let stream = stream::iter(stream)
1251            .map(|(full_name, mut channel)| async move {
1252                let recipients = channel.subscribers.indices().await?;
1253                channel.block_heights.push(height);
1254                let targets = recipients
1255                    .into_iter()
1256                    .map(|recipient| Target::channel(recipient, full_name.clone()))
1257                    .collect::<Vec<_>>();
1258                Ok::<_, ChainError>(targets)
1259            })
1260            .buffer_unordered(max_stream_queries);
1261        let infos = stream.try_collect::<Vec<_>>().await?;
1262        let targets = infos.into_iter().flatten().collect::<Vec<_>>();
1263        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1264        let outbox_counters = self.outbox_counters.get_mut();
1265        for mut outbox in outboxes {
1266            if outbox.schedule_message(height)? {
1267                *outbox_counters.entry(height).or_default() += 1;
1268            }
1269        }
1270        #[cfg(with_metrics)]
1271        NUM_OUTBOXES
1272            .with_label_values(&[])
1273            .observe(self.outboxes.count().await? as f64);
1274        Ok(())
1275    }
1276
1277    /// Processes new subscriptions. Returns `true` if at least one new subscriber was added for
1278    /// which we have outgoing messages.
1279    pub async fn process_subscribes(
1280        &mut self,
1281        names_and_ids: Vec<(ChannelFullName, ChainId)>,
1282    ) -> Result<bool, ChainError> {
1283        if names_and_ids.is_empty() {
1284            return Ok(false);
1285        }
1286        let full_names = names_and_ids
1287            .iter()
1288            .map(|(name, _)| name.clone())
1289            .collect::<Vec<_>>();
1290        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1291        let subscribe_channels = names_and_ids.into_iter().zip(channels);
1292        let max_stream_queries = self.context().max_stream_queries();
1293        let stream = stream::iter(subscribe_channels)
1294            .map(|((name, id), mut channel)| async move {
1295                if channel.subscribers.contains(&id).await? {
1296                    return Ok(None); // Was already a subscriber.
1297                }
1298                tracing::trace!("Adding subscriber {id:.8} for {name:}");
1299                channel.subscribers.insert(&id)?;
1300                // Send all messages.
1301                let heights = channel.block_heights.read(..).await?;
1302                if heights.is_empty() {
1303                    return Ok(None); // No messages on this channel yet.
1304                }
1305                let target = Target::channel(id, name.clone());
1306                Ok::<_, ChainError>(Some((target, heights)))
1307            })
1308            .buffer_unordered(max_stream_queries);
1309        let infos = stream.try_collect::<Vec<_>>().await?;
1310        let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
1311        let mut new_outbox_entries = false;
1312        let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1313        let outbox_counters = self.outbox_counters.get_mut();
1314        for (heights, mut outbox) in heights.into_iter().zip(outboxes) {
1315            for height in heights {
1316                if outbox.schedule_message(height)? {
1317                    *outbox_counters.entry(height).or_default() += 1;
1318                    new_outbox_entries = true;
1319                }
1320            }
1321        }
1322        #[cfg(with_metrics)]
1323        NUM_OUTBOXES
1324            .with_label_values(&[])
1325            .observe(self.outboxes.count().await? as f64);
1326        Ok(new_outbox_entries)
1327    }
1328
1329    pub async fn process_unsubscribes(
1330        &mut self,
1331        names_and_ids: Vec<(ChannelFullName, ChainId)>,
1332    ) -> Result<(), ChainError> {
1333        if names_and_ids.is_empty() {
1334            return Ok(());
1335        }
1336        let full_names = names_and_ids
1337            .iter()
1338            .map(|(name, _)| name.clone())
1339            .collect::<Vec<_>>();
1340        let channels = self.channels.try_load_entries_mut(&full_names).await?;
1341        for ((_name, id), mut channel) in names_and_ids.into_iter().zip(channels) {
1342            // Remove subscriber. Do not remove the channel outbox yet.
1343            channel.subscribers.remove(&id)?;
1344        }
1345        Ok(())
1346    }
1347}
1348
1349#[test]
1350fn empty_block_size() {
1351    let size = bcs::serialized_size(&crate::block::Block::new(
1352        crate::test::make_first_block(ChainId::root(0)),
1353        crate::data_types::BlockExecutionOutcome::default(),
1354    ))
1355    .unwrap();
1356    assert_eq!(size, EMPTY_BLOCK_SIZE);
1357}