1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14 OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19 time::{Duration, Instant},
20};
21use linera_execution::{
22 committee::Committee, system::EPOCH_STREAM_NAME, ExecutionRuntimeContext, ExecutionStateView,
23 Message, Operation, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
24 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
25 FLAG_MANDATORY_APPS_NEED_ACCEPTED_MESSAGE,
26};
27use linera_views::{
28 bucket_queue_view::BucketQueueView,
29 context::Context,
30 log_view::LogView,
31 map_view::MapView,
32 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
33 register_view::RegisterView,
34 set_view::SetView,
35 views::{ClonableView, RootView, View},
36};
37use serde::{Deserialize, Serialize};
38use tracing::{info, instrument};
39
40use crate::{
41 block::{Block, ConfirmedBlock},
42 block_tracker::BlockExecutionTracker,
43 data_types::{
44 BlockExecutionOutcome, BundleExecutionPolicy, BundleFailurePolicy, ChainAndHeight,
45 IncomingBundle, MessageAction, MessageBundle, ProposedBlock, Transaction,
46 },
47 inbox::{Cursor, InboxError, InboxStateView},
48 manager::ChainManager,
49 outbox::OutboxStateView,
50 pending_blobs::PendingBlobsView,
51 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
52};
53
54#[cfg(test)]
55#[path = "unit_tests/chain_tests.rs"]
56mod chain_tests;
57
58#[cfg(with_metrics)]
59use linera_base::prometheus_util::MeasureLatency;
60
61#[cfg(with_metrics)]
62pub(crate) mod metrics {
63 use std::sync::LazyLock;
64
65 use linera_base::prometheus_util::{
66 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
67 };
68 use linera_execution::ResourceTracker;
69 use prometheus::{HistogramVec, IntCounterVec};
70
71 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
72 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
73 });
74
75 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
76 register_histogram_vec(
77 "block_execution_latency",
78 "Block execution latency",
79 &[],
80 exponential_bucket_interval(50.0_f64, 10_000_000.0),
81 )
82 });
83
84 #[cfg(with_metrics)]
85 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
86 register_histogram_vec(
87 "message_execution_latency",
88 "Message execution latency",
89 &[],
90 exponential_bucket_interval(0.1_f64, 1_000_000.0),
91 )
92 });
93
94 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
95 register_histogram_vec(
96 "operation_execution_latency",
97 "Operation execution latency",
98 &[],
99 exponential_bucket_interval(0.1_f64, 1_000_000.0),
100 )
101 });
102
103 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
104 register_histogram_vec(
105 "wasm_fuel_used_per_block",
106 "Wasm fuel used per block",
107 &[],
108 exponential_bucket_interval(10.0, 100_000_000.0),
109 )
110 });
111
112 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
113 register_histogram_vec(
114 "evm_fuel_used_per_block",
115 "EVM fuel used per block",
116 &[],
117 exponential_bucket_interval(10.0, 100_000_000.0),
118 )
119 });
120
121 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
122 register_histogram_vec(
123 "vm_num_reads_per_block",
124 "VM number of reads per block",
125 &[],
126 exponential_bucket_interval(0.1, 100.0),
127 )
128 });
129
130 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
131 register_histogram_vec(
132 "vm_bytes_read_per_block",
133 "VM number of bytes read per block",
134 &[],
135 exponential_bucket_interval(0.1, 10_000_000.0),
136 )
137 });
138
139 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
140 register_histogram_vec(
141 "vm_bytes_written_per_block",
142 "VM number of bytes written per block",
143 &[],
144 exponential_bucket_interval(0.1, 10_000_000.0),
145 )
146 });
147
148 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
149 register_histogram_vec(
150 "state_hash_computation_latency",
151 "Time to recompute the state hash, in microseconds",
152 &[],
153 exponential_bucket_interval(1.0, 2_000_000.0),
154 )
155 });
156
157 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158 register_histogram_vec(
159 "num_outboxes",
160 "Number of outboxes",
161 &[],
162 exponential_bucket_interval(1.0, 10_000.0),
163 )
164 });
165
166 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
168 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
169 WASM_FUEL_USED_PER_BLOCK
170 .with_label_values(&[])
171 .observe(tracker.wasm_fuel as f64);
172 EVM_FUEL_USED_PER_BLOCK
173 .with_label_values(&[])
174 .observe(tracker.evm_fuel as f64);
175 VM_NUM_READS_PER_BLOCK
176 .with_label_values(&[])
177 .observe(tracker.read_operations as f64);
178 VM_BYTES_READ_PER_BLOCK
179 .with_label_values(&[])
180 .observe(tracker.bytes_read as f64);
181 VM_BYTES_WRITTEN_PER_BLOCK
182 .with_label_values(&[])
183 .observe(tracker.bytes_written as f64);
184 }
185}
186
187pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
189
190#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
192#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
193pub struct TimestampedBundleInInbox {
194 pub entry: BundleInInbox,
196 pub seen: Timestamp,
198}
199
200#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
202#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Allocative)]
203pub struct BundleInInbox {
204 pub origin: ChainId,
206 pub cursor: Cursor,
208}
209
210const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
213
214#[cfg_attr(
216 with_graphql,
217 derive(async_graphql::SimpleObject),
218 graphql(cache_control(no_cache))
219)]
220#[derive(Debug, RootView, ClonableView, Allocative)]
221#[allocative(bound = "C")]
222pub struct ChainStateView<C>
223where
224 C: Clone + Context + 'static,
225{
226 pub execution_state: ExecutionStateView<C>,
228 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
230
231 pub tip_state: RegisterView<C, ChainTipState>,
233
234 pub manager: ChainManager<C>,
236 pub pending_validated_blobs: PendingBlobsView<C>,
239 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
241
242 pub confirmed_log: LogView<C, CryptoHash>,
245 pub received_log: LogView<C, ChainAndHeight>,
247 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
249
250 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
252 pub unskippable_bundles:
254 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
255 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
257 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
259 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
261 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
263 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
266 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
268
269 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
271
272 pub next_expected_events: MapView<C, StreamId, u32>,
275
276 pub nonempty_inboxes: RegisterView<C, Option<BTreeSet<ChainId>>>,
280
281 pub block_zero_executed_at: RegisterView<C, Timestamp>,
285}
286
287#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
289#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
290pub struct ChainTipState {
291 pub block_hash: Option<CryptoHash>,
293 pub next_block_height: BlockHeight,
295 pub num_incoming_bundles: u32,
297 pub num_operations: u32,
299 pub num_outgoing_messages: u32,
301}
302
303impl ChainTipState {
304 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
307 ensure!(
308 new_block.height == self.next_block_height,
309 ChainError::UnexpectedBlockHeight {
310 expected_block_height: self.next_block_height,
311 found_block_height: new_block.height
312 }
313 );
314 ensure!(
315 new_block.previous_block_hash == self.block_hash,
316 ChainError::UnexpectedPreviousBlockHash
317 );
318 Ok(())
319 }
320
321 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
324 ensure!(
325 self.next_block_height >= height,
326 ChainError::MissingEarlierBlocks {
327 current_block_height: self.next_block_height,
328 }
329 );
330 Ok(self.next_block_height > height)
331 }
332
333 pub fn update_counters(
335 &mut self,
336 transactions: &[Transaction],
337 messages: &[Vec<OutgoingMessage>],
338 ) -> Result<(), ChainError> {
339 let mut num_incoming_bundles = 0u32;
340 let mut num_operations = 0u32;
341
342 for transaction in transactions {
343 match transaction {
344 Transaction::ReceiveMessages(_) => {
345 num_incoming_bundles = num_incoming_bundles
346 .checked_add(1)
347 .ok_or(ArithmeticError::Overflow)?;
348 }
349 Transaction::ExecuteOperation(_) => {
350 num_operations = num_operations
351 .checked_add(1)
352 .ok_or(ArithmeticError::Overflow)?;
353 }
354 }
355 }
356
357 self.num_incoming_bundles = self
358 .num_incoming_bundles
359 .checked_add(num_incoming_bundles)
360 .ok_or(ArithmeticError::Overflow)?;
361
362 self.num_operations = self
363 .num_operations
364 .checked_add(num_operations)
365 .ok_or(ArithmeticError::Overflow)?;
366
367 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
368 .map_err(|_| ArithmeticError::Overflow)?;
369 self.num_outgoing_messages = self
370 .num_outgoing_messages
371 .checked_add(num_outgoing_messages)
372 .ok_or(ArithmeticError::Overflow)?;
373
374 Ok(())
375 }
376}
377
378impl<C> ChainStateView<C>
379where
380 C: Context + Clone + 'static,
381 C::Extra: ExecutionRuntimeContext,
382{
383 pub fn chain_id(&self) -> ChainId {
385 self.context().extra().chain_id()
386 }
387
388 #[instrument(skip_all, fields(
389 chain_id = %self.chain_id(),
390 ))]
391 pub async fn query_application(
392 &mut self,
393 local_time: Timestamp,
394 query: Query,
395 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
396 ) -> Result<QueryOutcome, ChainError> {
397 let context = QueryContext {
398 chain_id: self.chain_id(),
399 next_block_height: self.tip_state.get().next_block_height,
400 local_time,
401 };
402 self.execution_state
403 .query_application(context, query, service_runtime_endpoint)
404 .await
405 .with_execution_context(ChainExecutionContext::Query)
406 }
407
408 #[instrument(skip_all, fields(
409 chain_id = %self.chain_id(),
410 application_id = %application_id
411 ))]
412 pub async fn describe_application(
413 &mut self,
414 application_id: ApplicationId,
415 ) -> Result<ApplicationDescription, ChainError> {
416 self.execution_state
417 .system
418 .describe_application(application_id, &mut TransactionTracker::default())
419 .await
420 .with_execution_context(ChainExecutionContext::DescribeApplication)
421 }
422
423 #[instrument(skip_all, fields(
424 chain_id = %self.chain_id(),
425 target = %target,
426 height = %height
427 ))]
428 pub async fn mark_messages_as_received(
429 &mut self,
430 target: &ChainId,
431 height: BlockHeight,
432 ) -> Result<bool, ChainError> {
433 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
434 let updates = outbox.mark_messages_as_received(height).await?;
435 if updates.is_empty() {
436 return Ok(false);
437 }
438 for update in updates {
439 let counter = self
440 .outbox_counters
441 .get_mut()
442 .get_mut(&update)
443 .ok_or_else(|| {
444 ChainError::CorruptedChainState("message counter should be present".into())
445 })?;
446 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
447 if *counter == 0 {
448 self.outbox_counters.get_mut().remove(&update);
450 }
451 }
452 if outbox.queue.count() == 0 {
453 self.nonempty_outboxes.get_mut().remove(target);
454 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
456 self.outboxes.remove_entry(target)?;
457 }
458 }
459 #[cfg(with_metrics)]
460 metrics::NUM_OUTBOXES
461 .with_label_values(&[])
462 .observe(self.nonempty_outboxes.get().len() as f64);
463 Ok(true)
464 }
465
466 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
469 tracing::debug!(
470 "Messages left in {:.8}'s outbox: {:?}",
471 self.chain_id(),
472 self.outbox_counters.get()
473 );
474 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
475 key > &height
476 } else {
477 true
478 }
479 }
480
481 pub async fn is_active(&self) -> Result<bool, ChainError> {
483 Ok(self.execution_state.system.is_active().await?)
484 }
485
486 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
488 if self
490 .execution_state
491 .system
492 .initialize_chain(self.chain_id())
493 .await
494 .with_execution_context(ChainExecutionContext::Block)?
495 {
496 return Ok(());
498 }
499 let hash = self.execution_state.crypto_hash_mut().await?;
501 self.execution_state_hash.set(Some(hash));
502 self.reset_chain_manager(BlockHeight(0), local_time).await?;
503 Ok(())
504 }
505
506 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
510 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
511 return Ok(height.saturating_add(BlockHeight(1)));
512 }
513 Ok(self.tip_state.get().next_block_height)
514 }
515
516 #[instrument(skip_all, fields(
523 chain_id = %self.chain_id(),
524 origin = %origin,
525 bundle_height = %bundle.height
526 ))]
527 pub async fn receive_message_bundle_with_inbox(
528 &mut self,
529 inbox: &mut InboxStateView<C>,
530 origin: &ChainId,
531 bundle: MessageBundle,
532 local_time: Timestamp,
533 add_to_received_log: bool,
534 ) -> Result<(), ChainError> {
535 assert!(!bundle.messages.is_empty());
536 let chain_id = self.chain_id();
537 tracing::trace!(
538 "Processing new messages from {origin} at height {}",
539 bundle.height,
540 );
541 let chain_and_height = ChainAndHeight {
542 chain_id: *origin,
543 height: bundle.height,
544 };
545
546 match self.initialize_if_needed(local_time).await {
547 Ok(_) => (),
548 Err(ChainError::ExecutionError(exec_err, _))
551 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
552 if blobs.iter().all(|blob_id| {
553 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
554 })) => {}
555 err => {
556 return err;
557 }
558 }
559
560 let newly_added = inbox
562 .add_bundle(bundle)
563 .await
564 .map_err(|error| match error {
565 InboxError::ViewError(error) => ChainError::ViewError(error),
566 error => ChainError::CorruptedChainState(format!(
567 "while processing messages in certified block: {error}"
568 )),
569 })?;
570 if newly_added {
571 if let Some(set) = self.nonempty_inboxes.get_mut() {
572 set.insert(*origin);
573 }
574 }
575
576 if add_to_received_log {
578 self.received_log.push(chain_and_height);
579 }
580 Ok(())
581 }
582
583 pub fn update_received_certificate_trackers(
585 &mut self,
586 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
587 ) {
588 for (name, tracker) in new_trackers {
589 self.received_certificate_trackers
590 .get_mut()
591 .entry(name)
592 .and_modify(|t| {
593 if tracker > *t {
596 *t = tracker;
597 }
598 })
599 .or_insert(tracker);
600 }
601 }
602
603 pub async fn current_committee(&self) -> Result<(Epoch, Arc<Committee>), ChainError> {
604 self.execution_state
605 .system
606 .current_committee()
607 .await?
608 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
609 }
610
611 pub async fn ownership(&self) -> Result<&ChainOwnership, ChainError> {
612 Ok(self.execution_state.system.ownership.get().await?)
613 }
614
615 #[instrument(skip_all, fields(
621 chain_id = %self.chain_id(),
622 ))]
623 pub async fn remove_bundles_from_inboxes(
624 &mut self,
625 timestamp: Timestamp,
626 must_be_present: bool,
627 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
628 ) -> Result<(), ChainError> {
629 let chain_id = self.chain_id();
630 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
631 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
632 ensure!(
633 bundle.timestamp <= timestamp,
634 ChainError::IncorrectBundleTimestamp {
635 chain_id,
636 bundle_timestamp: bundle.timestamp,
637 block_timestamp: timestamp,
638 }
639 );
640 let bundles = bundles_by_origin.entry(*origin).or_default();
641 bundles.push(bundle);
642 }
643 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
644 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
645 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
646 tracing::trace!(
647 "Removing [{}] from inbox for {origin}",
648 bundles
649 .iter()
650 .map(|bundle| bundle.height.to_string())
651 .collect::<Vec<_>>()
652 .join(", ")
653 );
654 for bundle in bundles {
655 let was_present = inbox
657 .remove_bundle(bundle)
658 .await
659 .map_err(|error| (chain_id, origin, error))?;
660 if must_be_present {
661 ensure!(
662 was_present,
663 ChainError::MissingCrossChainUpdate {
664 chain_id,
665 origin,
666 height: bundle.height,
667 }
668 );
669 }
670 }
671 inbox.observe_size_metric();
672 if inbox.added_bundles.count() == 0 {
673 if let Some(set) = self.nonempty_inboxes.get_mut() {
674 set.remove(&origin);
675 }
676 }
677 }
678 Ok(())
679 }
680
681 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
683 self.nonempty_outboxes.get().iter().copied().collect()
684 }
685
686 pub async fn load_outboxes(
688 &self,
689 targets: &[ChainId],
690 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
691 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
692 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
693 optional_vec.ok_or_else(|| ChainError::CorruptedChainState("Missing outboxes".into()))
694 }
695
696 #[instrument(skip_all, fields(
698 chain_id = %block.chain_id,
699 block_height = %block.height
700 ))]
701 #[expect(clippy::too_many_arguments)]
702 async fn execute_block_inner(
703 chain: &mut ExecutionStateView<C>,
704 confirmed_log: &LogView<C, CryptoHash>,
705 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
706 previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
707 block: &mut ProposedBlock,
708 local_time: Timestamp,
709 round: Option<u32>,
710 published_blobs: &[Blob],
711 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
712 exec_policy: BundleExecutionPolicy,
713 ) -> Result<(BlockExecutionOutcome, ResourceTracker), ChainError> {
714 if !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort) {
717 assert!(
718 replaying_oracle_responses.is_none(),
719 "Cannot use AutoRetry policy when replaying oracle responses"
720 );
721 }
722
723 #[cfg(with_metrics)]
724 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency_us();
725 chain.system.timestamp.set(block.timestamp);
726
727 let committee_policy = chain
728 .system
729 .current_committee()
730 .await?
731 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
732 .1
733 .policy()
734 .clone();
735
736 let mut resource_controller = ResourceController::new(
737 Arc::new(committee_policy),
738 ResourceTracker::default(),
739 block.authenticated_signer,
740 );
741
742 for blob in published_blobs {
743 let blob_id = blob.id();
744 resource_controller
745 .policy()
746 .check_blob_size(blob.content())
747 .with_execution_context(ChainExecutionContext::Block)?;
748 chain.system.used_blobs.insert(&blob_id)?;
749 }
750
751 let mut block_execution_tracker = BlockExecutionTracker::new(
752 &mut resource_controller,
753 published_blobs
754 .iter()
755 .map(|blob| (blob.id(), blob))
756 .collect(),
757 local_time,
758 replaying_oracle_responses,
759 block,
760 )?;
761
762 let max_failures = match exec_policy.on_failure {
764 BundleFailurePolicy::Abort => 0,
765 BundleFailurePolicy::AutoRetry { max_failures } => max_failures,
766 };
767 let auto_retry = !matches!(exec_policy.on_failure, BundleFailurePolicy::Abort);
768 let mut failure_count = 0u32;
769
770 let time_budget = exec_policy.time_budget;
772 let mut cumulative_bundle_time = Duration::ZERO;
773
774 let mut i = 0;
775 while i < block.transactions.len() {
776 let transaction = &mut block.transactions[i];
777 let is_bundle = matches!(transaction, Transaction::ReceiveMessages(_));
778
779 if is_bundle && time_budget.is_some_and(|budget| cumulative_bundle_time >= budget) {
781 info!(
782 ?cumulative_bundle_time,
783 ?time_budget,
784 "Time budget exceeded, discarding all remaining message bundles"
785 );
786 Self::discard_remaining_bundles(block, i, None);
787 continue;
788 }
789
790 let checkpoint = if auto_retry && is_bundle {
792 Some((
793 chain.clone_unchecked()?,
794 block_execution_tracker.create_checkpoint(),
795 ))
796 } else {
797 None
798 };
799
800 let bundle_start = if is_bundle && time_budget.is_some() {
802 Some(Instant::now())
803 } else {
804 None
805 };
806
807 let result = block_execution_tracker
808 .execute_transaction(&*transaction, round, chain)
809 .await;
810
811 if let Some(start) = bundle_start {
813 cumulative_bundle_time += start.elapsed();
814 }
815
816 let (error, context, incoming_bundle, saved_chain, saved_tracker) =
821 match (result, transaction, checkpoint) {
822 (Ok(()), _, _) => {
823 i += 1;
824 continue;
825 }
826 (
827 Err(ChainError::ExecutionError(error, context)),
828 Transaction::ReceiveMessages(incoming_bundle),
829 Some((saved_chain, saved_tracker)),
830 ) if !error.is_transient_error() => {
831 (error, context, incoming_bundle, saved_chain, saved_tracker)
832 }
833 (Err(e), _, _) => return Err(e),
834 };
835
836 *chain = saved_chain;
838 block_execution_tracker.restore_checkpoint(saved_tracker);
839
840 if error.is_limit_error() && i > 0 {
841 failure_count += 1;
842 let maybe_sender = if failure_count > max_failures {
844 info!(
845 failure_count,
846 max_failures,
847 "Exceeded max bundle failures, discarding all remaining message bundles"
848 );
849 None
850 } else {
851 info!(
853 %error,
854 index = i,
855 origin = %incoming_bundle.origin,
856 "Message bundle exceeded block limits and will be discarded for \
857 retry in a later block"
858 );
859 Some(incoming_bundle.origin)
860 };
861 Self::discard_remaining_bundles(block, i, maybe_sender);
862 } else if incoming_bundle.bundle.is_protected()
864 || incoming_bundle.action == MessageAction::Reject
865 {
866 return Err(ChainError::ExecutionError(error, context));
868 } else {
869 info!(
872 %error,
873 index = i,
874 origin = %incoming_bundle.origin,
875 "Message bundle failed to execute and will be rejected"
876 );
877 incoming_bundle.action = MessageAction::Reject;
878 }
880 }
881
882 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
885
886 let recipients = block_execution_tracker.recipients();
887 let heights = previous_message_blocks_view.multi_get(&recipients).await?;
888 let mut recipient_heights = Vec::new();
889 let mut indices = Vec::new();
890 for (height, recipient) in heights.into_iter().zip(recipients) {
891 if let Some(height) = height {
892 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
893 indices.push(index);
894 recipient_heights.push((recipient, height));
895 }
896 }
897 let hashes = confirmed_log.multi_get(indices).await?;
898 let mut previous_message_blocks = BTreeMap::new();
899 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
900 let hash = hash.ok_or_else(|| {
901 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
902 })?;
903 previous_message_blocks.insert(recipient, (hash, height));
904 }
905
906 let streams = block_execution_tracker.event_streams();
907 let heights = previous_event_blocks_view.multi_get(&streams).await?;
908 let mut stream_heights = Vec::new();
909 let mut indices = Vec::new();
910 for (stream, height) in streams.into_iter().zip(heights) {
911 if let Some(height) = height {
912 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
913 indices.push(index);
914 stream_heights.push((stream, height));
915 }
916 }
917 let hashes = confirmed_log.multi_get(indices).await?;
918 let mut previous_event_blocks = BTreeMap::new();
919 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
920 let hash = hash.ok_or_else(|| {
921 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
922 })?;
923 previous_event_blocks.insert(stream, (hash, height));
924 }
925
926 let state_hash = {
927 #[cfg(with_metrics)]
928 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency_us();
929 chain.crypto_hash_mut().await?
930 };
931
932 let (messages, oracle_responses, events, blobs, operation_results, resource_tracker) =
933 block_execution_tracker.finalize(block.transactions.len());
934
935 Ok((
936 BlockExecutionOutcome {
937 messages,
938 previous_message_blocks,
939 previous_event_blocks,
940 state_hash,
941 oracle_responses,
942 events,
943 blobs,
944 operation_results,
945 },
946 resource_tracker,
947 ))
948 }
949
950 fn discard_remaining_bundles(
952 block: &mut ProposedBlock,
953 mut index: usize,
954 maybe_origin: Option<ChainId>,
955 ) {
956 while index < block.transactions.len() {
957 if matches!(
958 &block.transactions[index],
959 Transaction::ReceiveMessages(bundle)
960 if maybe_origin.is_none_or(|origin| bundle.origin == origin)
961 ) {
962 block.transactions.remove(index);
963 } else {
964 index += 1;
965 }
966 }
967 }
968
969 #[instrument(skip_all, fields(
980 chain_id = %self.chain_id(),
981 block_height = %block.height
982 ))]
983 pub async fn execute_block(
984 &mut self,
985 mut block: ProposedBlock,
986 local_time: Timestamp,
987 round: Option<u32>,
988 published_blobs: &[Blob],
989 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
990 policy: BundleExecutionPolicy,
991 ) -> Result<(ProposedBlock, BlockExecutionOutcome, ResourceTracker), ChainError> {
992 assert_eq!(
993 block.chain_id,
994 self.execution_state.context().extra().chain_id()
995 );
996
997 self.initialize_if_needed(local_time).await?;
998
999 let chain_timestamp = *self.execution_state.system.timestamp.get();
1000 ensure!(
1001 chain_timestamp <= block.timestamp,
1002 ChainError::InvalidBlockTimestamp {
1003 parent: chain_timestamp,
1004 new: block.timestamp
1005 }
1006 );
1007 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
1008
1009 ensure!(
1010 block.published_blob_ids()
1011 == published_blobs
1012 .iter()
1013 .map(|blob| blob.id())
1014 .collect::<BTreeSet<_>>(),
1015 ChainError::InternalError("published_blobs mismatch".to_string())
1016 );
1017
1018 if *self.execution_state.system.closed.get() {
1019 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
1020 }
1021
1022 let mandatory_apps_need_accepted_message = self
1023 .current_committee()
1024 .await?
1025 .1
1026 .policy()
1027 .http_request_allow_list
1028 .contains(FLAG_MANDATORY_APPS_NEED_ACCEPTED_MESSAGE);
1029 Self::check_app_permissions(
1030 self.execution_state
1031 .system
1032 .application_permissions
1033 .get()
1034 .await?,
1035 &block,
1036 mandatory_apps_need_accepted_message,
1037 )?;
1038
1039 Self::execute_block_inner(
1040 &mut self.execution_state,
1041 &self.confirmed_log,
1042 &self.previous_message_blocks,
1043 &self.previous_event_blocks,
1044 &mut block,
1045 local_time,
1046 round,
1047 published_blobs,
1048 replaying_oracle_responses,
1049 policy,
1050 )
1051 .await
1052 .map(|(outcome, tracker)| (block, outcome, tracker))
1053 }
1054
1055 async fn process_emitted_events(
1062 &mut self,
1063 block: &Block,
1064 ) -> Result<BTreeSet<StreamId>, ChainError> {
1065 let mut emitted_streams = BTreeMap::<StreamId, BTreeSet<u32>>::new();
1066 for event in block.body.events.iter().flatten() {
1067 emitted_streams
1068 .entry(event.stream_id.clone())
1069 .or_default()
1070 .insert(event.index);
1071 }
1072
1073 let mut updated_streams = BTreeSet::new();
1074 for (stream_id, indices) in emitted_streams {
1075 let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
1077 1
1078 } else {
1079 0
1080 };
1081 let mut current_expected_index = self
1082 .next_expected_events
1083 .get(&stream_id)
1084 .await?
1085 .unwrap_or(initial_index);
1086 for index in indices {
1087 if index == current_expected_index {
1088 updated_streams.insert(stream_id.clone());
1089 current_expected_index = index.saturating_add(1);
1090 }
1091 }
1092 if current_expected_index != 0 {
1093 self.next_expected_events
1094 .insert(&stream_id, current_expected_index)?;
1095 }
1096 }
1097 Ok(updated_streams)
1098 }
1099
1100 #[instrument(skip_all, fields(
1104 chain_id = %self.chain_id(),
1105 block_height = %block.inner().inner().header.height
1106 ))]
1107 pub async fn apply_confirmed_block(
1108 &mut self,
1109 block: &ConfirmedBlock,
1110 local_time: Timestamp,
1111 ) -> Result<BTreeSet<StreamId>, ChainError> {
1112 let hash = block.inner().hash();
1113 let block = block.inner().inner();
1114 if block.header.height == BlockHeight::ZERO {
1115 self.block_zero_executed_at.set(local_time);
1116 }
1117 self.execution_state_hash.set(Some(block.header.state_hash));
1118 let recipients = self.process_outgoing_messages(block).await?;
1119
1120 for recipient in recipients {
1121 self.previous_message_blocks
1122 .insert(&recipient, block.header.height)?;
1123 }
1124 for event in block.body.events.iter().flatten() {
1125 self.previous_event_blocks
1126 .insert(&event.stream_id, block.header.height)?;
1127 }
1128 let updated_streams = self.process_emitted_events(block).await?;
1129 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)
1131 .await?;
1132
1133 let tip = self.tip_state.get_mut();
1135 tip.block_hash = Some(hash);
1136 tip.next_block_height.try_add_assign_one()?;
1137 tip.update_counters(&block.body.transactions, &block.body.messages)?;
1138 self.confirmed_log.push(hash);
1139 self.preprocessed_blocks.remove(&block.header.height)?;
1140 Ok(updated_streams)
1141 }
1142
1143 #[instrument(skip_all, fields(
1146 chain_id = %self.chain_id(),
1147 block_height = %block.inner().inner().header.height
1148 ))]
1149 pub async fn preprocess_block(
1150 &mut self,
1151 block: &ConfirmedBlock,
1152 ) -> Result<BTreeSet<StreamId>, ChainError> {
1153 let hash = block.inner().hash();
1154 let block = block.inner().inner();
1155 let height = block.header.height;
1156 if height < self.tip_state.get().next_block_height {
1157 return Ok(BTreeSet::new());
1158 }
1159 self.process_outgoing_messages(block).await?;
1160 let updated_streams = self.process_emitted_events(block).await?;
1161 self.preprocessed_blocks.insert(&height, hash)?;
1162 Ok(updated_streams)
1163 }
1164
1165 pub async fn is_child(&self) -> Result<bool, ChainError> {
1167 let description = self.execution_state.system.description.get().await?;
1168 let Some(description) = description else {
1169 return Ok(true);
1171 };
1172 Ok(description.is_child())
1173 }
1174
1175 #[instrument(skip_all, fields(
1177 block_height = %block.height,
1178 num_transactions = %block.transactions.len()
1179 ))]
1180 fn check_app_permissions(
1181 app_permissions: &ApplicationPermissions,
1182 block: &ProposedBlock,
1183 mandatory_apps_need_accepted_message: bool,
1184 ) -> Result<(), ChainError> {
1185 let mut mandatory = HashSet::<ApplicationId>::from_iter(
1186 app_permissions.mandatory_applications.iter().copied(),
1187 );
1188 for transaction in &block.transactions {
1189 match transaction {
1190 Transaction::ExecuteOperation(operation)
1191 if operation.is_exempt_from_permissions() =>
1192 {
1193 mandatory.clear()
1194 }
1195 Transaction::ExecuteOperation(operation) => {
1196 ensure!(
1197 app_permissions.can_execute_operations(&operation.application_id()),
1198 ChainError::AuthorizedApplications(
1199 app_permissions.execute_operations.clone().unwrap()
1200 )
1201 );
1202 if let Operation::User { application_id, .. } = operation {
1203 mandatory.remove(application_id);
1204 }
1205 }
1206 Transaction::ReceiveMessages(incoming_bundle)
1207 if !mandatory_apps_need_accepted_message
1208 || incoming_bundle.action == MessageAction::Accept =>
1209 {
1210 for pending in incoming_bundle.messages() {
1211 if let Message::User { application_id, .. } = &pending.message {
1212 mandatory.remove(application_id);
1213 }
1214 }
1215 }
1216 Transaction::ReceiveMessages(_) => {}
1217 }
1218 }
1219 ensure!(
1220 mandatory.is_empty(),
1221 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1222 );
1223 Ok(())
1224 }
1225
1226 #[instrument(skip_all, fields(
1231 chain_id = %self.chain_id(),
1232 next_block_height = %self.tip_state.get().next_block_height,
1233 ))]
1234 pub async fn block_hashes(
1235 &self,
1236 heights: impl IntoIterator<Item = BlockHeight>,
1237 ) -> Result<Vec<CryptoHash>, ChainError> {
1238 let next_height = self.tip_state.get().next_block_height;
1239 let (confirmed_heights, unconfirmed_heights) = heights
1241 .into_iter()
1242 .partition::<Vec<_>, _>(|height| *height < next_height);
1243 let confirmed_indices = confirmed_heights
1244 .into_iter()
1245 .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1246 .collect::<Result<_, _>>()?;
1247 let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1248 let unconfirmed_hashes = self
1250 .preprocessed_blocks
1251 .multi_get(&unconfirmed_heights)
1252 .await?;
1253 Ok(confirmed_hashes
1254 .into_iter()
1255 .chain(unconfirmed_hashes)
1256 .flatten()
1257 .collect())
1258 }
1259
1260 async fn reset_chain_manager(
1262 &mut self,
1263 next_height: BlockHeight,
1264 local_time: Timestamp,
1265 ) -> Result<(), ChainError> {
1266 let maybe_committee = self.execution_state.system.current_committee().await?;
1267 let ownership = self.execution_state.system.ownership.get().await?.clone();
1268 let fallback_owners = maybe_committee
1269 .iter()
1270 .flat_map(|(_, committee)| committee.account_keys_and_weights());
1271 self.pending_validated_blobs.clear();
1272 self.pending_proposed_blobs.clear();
1273 self.manager
1274 .reset(ownership, next_height, local_time, fallback_owners)
1275 }
1276
1277 #[instrument(skip_all, fields(
1281 chain_id = %self.chain_id(),
1282 block_height = %block.header.height
1283 ))]
1284 async fn process_outgoing_messages(
1285 &mut self,
1286 block: &Block,
1287 ) -> Result<Vec<ChainId>, ChainError> {
1288 let recipients = block.recipients();
1291 let block_height = block.header.height;
1292 let next_height = self.tip_state.get().next_block_height;
1293
1294 let outbox_counters = self.outbox_counters.get_mut();
1296 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1297 let targets = recipients.into_iter().collect::<Vec<_>>();
1298 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1299 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1300 if block_height > next_height {
1301 if *outbox.next_height_to_schedule.get() > block_height {
1304 continue; }
1306 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1307 {
1308 Some(height) if height < next_height => {
1311 let index =
1312 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1313 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1314 ChainError::CorruptedChainState("missing entry in confirmed_log".into())
1315 })?)
1316 }
1317 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1320 || {
1321 ChainError::CorruptedChainState(
1322 "missing entry in preprocessed_blocks".into(),
1323 )
1324 },
1325 )?),
1326 None => None, };
1328 match (
1330 maybe_prev_hash,
1331 block.body.previous_message_blocks.get(target),
1332 ) {
1333 (None, None) => {
1334 }
1337 (Some(_), None) => {
1338 return Err(ChainError::CorruptedChainState(
1341 "block indicates no previous message block,\
1342 but we have one in the outbox"
1343 .into(),
1344 ));
1345 }
1346 (None, Some((_, prev_msg_block_height))) => {
1347 if *prev_msg_block_height >= next_height {
1352 continue;
1353 }
1354 }
1355 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1356 if prev_hash != prev_msg_block_hash {
1358 continue;
1359 }
1360 }
1361 }
1362 }
1363 if outbox.schedule_message(block_height)? {
1364 *outbox_counters.entry(block_height).or_default() += 1;
1365 nonempty_outboxes.insert(*target);
1366 }
1367 #[cfg(with_metrics)]
1368 crate::outbox::metrics::OUTBOX_SIZE
1369 .with_label_values(&[])
1370 .observe(outbox.queue.count() as f64);
1371 }
1372
1373 #[cfg(with_metrics)]
1374 metrics::NUM_OUTBOXES
1375 .with_label_values(&[])
1376 .observe(nonempty_outboxes.len() as f64);
1377 Ok(targets)
1378 }
1379}
1380
1381#[test]
1382fn empty_block_size() {
1383 let size = bcs::serialized_size(&crate::block::Block::new(
1384 crate::test::make_first_block(
1385 linera_execution::test_utils::dummy_chain_description(0).id(),
1386 ),
1387 crate::data_types::BlockExecutionOutcome::default(),
1388 ))
1389 .unwrap();
1390 assert_eq!(size, EMPTY_BLOCK_SIZE);
1391}