1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
6 sync::Arc,
7};
8
9use allocative::Allocative;
10use linera_base::{
11 crypto::{CryptoHash, ValidatorPublicKey},
12 data_types::{
13 ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight, Epoch,
14 OracleResponse, Timestamp,
15 },
16 ensure,
17 identifiers::{AccountOwner, ApplicationId, BlobType, ChainId, StreamId},
18 ownership::ChainOwnership,
19};
20use linera_execution::{
21 committee::Committee, ExecutionRuntimeContext, ExecutionStateView, Message, Operation,
22 OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController, ResourceTracker,
23 ServiceRuntimeEndpoint, TransactionTracker,
24};
25use linera_views::{
26 bucket_queue_view::BucketQueueView,
27 context::Context,
28 log_view::LogView,
29 map_view::MapView,
30 reentrant_collection_view::{ReadGuardedView, ReentrantCollectionView},
31 register_view::RegisterView,
32 set_view::SetView,
33 views::{ClonableView, RootView, View},
34};
35use serde::{Deserialize, Serialize};
36use tracing::instrument;
37
38use crate::{
39 block::{Block, ConfirmedBlock},
40 block_tracker::BlockExecutionTracker,
41 data_types::{
42 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageBundle, ProposedBlock,
43 Transaction,
44 },
45 inbox::{Cursor, InboxError, InboxStateView},
46 manager::ChainManager,
47 outbox::OutboxStateView,
48 pending_blobs::PendingBlobsView,
49 ChainError, ChainExecutionContext, ExecutionError, ExecutionResultExt,
50};
51
52#[cfg(test)]
53#[path = "unit_tests/chain_tests.rs"]
54mod chain_tests;
55
56#[cfg(with_metrics)]
57use linera_base::prometheus_util::MeasureLatency;
58
59#[cfg(with_metrics)]
60pub(crate) mod metrics {
61 use std::sync::LazyLock;
62
63 use linera_base::prometheus_util::{
64 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
65 register_int_counter_vec,
66 };
67 use linera_execution::ResourceTracker;
68 use prometheus::{HistogramVec, IntCounterVec};
69
70 pub static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
72 });
73
74 pub static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75 register_histogram_vec(
76 "block_execution_latency",
77 "Block execution latency",
78 &[],
79 exponential_bucket_latencies(1000.0),
80 )
81 });
82
83 #[cfg(with_metrics)]
84 pub static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
85 register_histogram_vec(
86 "message_execution_latency",
87 "Message execution latency",
88 &[],
89 exponential_bucket_latencies(50.0),
90 )
91 });
92
93 pub static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
94 register_histogram_vec(
95 "operation_execution_latency",
96 "Operation execution latency",
97 &[],
98 exponential_bucket_latencies(50.0),
99 )
100 });
101
102 pub static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
103 register_histogram_vec(
104 "wasm_fuel_used_per_block",
105 "Wasm fuel used per block",
106 &[],
107 exponential_bucket_interval(10.0, 1_000_000.0),
108 )
109 });
110
111 pub static EVM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
112 register_histogram_vec(
113 "evm_fuel_used_per_block",
114 "EVM fuel used per block",
115 &[],
116 exponential_bucket_interval(10.0, 1_000_000.0),
117 )
118 });
119
120 pub static VM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "vm_num_reads_per_block",
123 "VM number of reads per block",
124 &[],
125 exponential_bucket_interval(0.1, 100.0),
126 )
127 });
128
129 pub static VM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
130 register_histogram_vec(
131 "vm_bytes_read_per_block",
132 "VM number of bytes read per block",
133 &[],
134 exponential_bucket_interval(0.1, 10_000_000.0),
135 )
136 });
137
138 pub static VM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
139 register_histogram_vec(
140 "vm_bytes_written_per_block",
141 "VM number of bytes written per block",
142 &[],
143 exponential_bucket_interval(0.1, 10_000_000.0),
144 )
145 });
146
147 pub static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148 register_histogram_vec(
149 "state_hash_computation_latency",
150 "Time to recompute the state hash",
151 &[],
152 exponential_bucket_latencies(500.0),
153 )
154 });
155
156 pub static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
157 register_histogram_vec(
158 "num_inboxes",
159 "Number of inboxes",
160 &[],
161 exponential_bucket_interval(1.0, 10_000.0),
162 )
163 });
164
165 pub static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
166 register_histogram_vec(
167 "num_outboxes",
168 "Number of outboxes",
169 &[],
170 exponential_bucket_interval(1.0, 10_000.0),
171 )
172 });
173
174 pub(crate) fn track_block_metrics(tracker: &ResourceTracker) {
176 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
177 WASM_FUEL_USED_PER_BLOCK
178 .with_label_values(&[])
179 .observe(tracker.wasm_fuel as f64);
180 EVM_FUEL_USED_PER_BLOCK
181 .with_label_values(&[])
182 .observe(tracker.evm_fuel as f64);
183 VM_NUM_READS_PER_BLOCK
184 .with_label_values(&[])
185 .observe(tracker.read_operations as f64);
186 VM_BYTES_READ_PER_BLOCK
187 .with_label_values(&[])
188 .observe(tracker.bytes_read as f64);
189 VM_BYTES_WRITTEN_PER_BLOCK
190 .with_label_values(&[])
191 .observe(tracker.bytes_written as f64);
192 }
193}
194
195pub(crate) const EMPTY_BLOCK_SIZE: usize = 94;
197
198#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
200#[derive(Debug, Clone, Serialize, Deserialize, Allocative)]
201pub struct TimestampedBundleInInbox {
202 pub entry: BundleInInbox,
204 pub seen: Timestamp,
206}
207
208#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
210#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Allocative)]
211pub struct BundleInInbox {
212 pub origin: ChainId,
214 pub cursor: Cursor,
216}
217
218const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
221
222#[cfg_attr(
224 with_graphql,
225 derive(async_graphql::SimpleObject),
226 graphql(cache_control(no_cache))
227)]
228#[derive(Debug, RootView, ClonableView, Allocative)]
229#[allocative(bound = "C")]
230pub struct ChainStateView<C>
231where
232 C: Clone + Context + 'static,
233{
234 pub execution_state: ExecutionStateView<C>,
236 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
238
239 pub tip_state: RegisterView<C, ChainTipState>,
241
242 pub manager: ChainManager<C>,
244 pub pending_validated_blobs: PendingBlobsView<C>,
247 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
249
250 pub confirmed_log: LogView<C, CryptoHash>,
253 pub received_log: LogView<C, ChainAndHeight>,
255 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
257
258 pub inboxes: ReentrantCollectionView<C, ChainId, InboxStateView<C>>,
260 pub unskippable_bundles:
262 BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
263 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
265 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
267 pub previous_event_blocks: MapView<C, StreamId, BlockHeight>,
269 pub outboxes: ReentrantCollectionView<C, ChainId, OutboxStateView<C>>,
271 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
274 pub nonempty_outboxes: RegisterView<C, BTreeSet<ChainId>>,
276
277 pub preprocessed_blocks: MapView<C, BlockHeight, CryptoHash>,
279}
280
281#[cfg_attr(with_graphql, derive(async_graphql::SimpleObject))]
283#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, Allocative)]
284pub struct ChainTipState {
285 pub block_hash: Option<CryptoHash>,
287 pub next_block_height: BlockHeight,
289 pub num_incoming_bundles: u32,
291 pub num_operations: u32,
293 pub num_outgoing_messages: u32,
295}
296
297impl ChainTipState {
298 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
301 ensure!(
302 new_block.height == self.next_block_height,
303 ChainError::UnexpectedBlockHeight {
304 expected_block_height: self.next_block_height,
305 found_block_height: new_block.height
306 }
307 );
308 ensure!(
309 new_block.previous_block_hash == self.block_hash,
310 ChainError::UnexpectedPreviousBlockHash
311 );
312 Ok(())
313 }
314
315 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
318 ensure!(
319 self.next_block_height >= height,
320 ChainError::MissingEarlierBlocks {
321 current_block_height: self.next_block_height,
322 }
323 );
324 Ok(self.next_block_height > height)
325 }
326
327 pub fn update_counters(
329 &mut self,
330 transactions: &[Transaction],
331 messages: &[Vec<OutgoingMessage>],
332 ) -> Result<(), ChainError> {
333 let mut num_incoming_bundles = 0u32;
334 let mut num_operations = 0u32;
335
336 for transaction in transactions {
337 match transaction {
338 Transaction::ReceiveMessages(_) => {
339 num_incoming_bundles = num_incoming_bundles
340 .checked_add(1)
341 .ok_or(ArithmeticError::Overflow)?;
342 }
343 Transaction::ExecuteOperation(_) => {
344 num_operations = num_operations
345 .checked_add(1)
346 .ok_or(ArithmeticError::Overflow)?;
347 }
348 }
349 }
350
351 self.num_incoming_bundles = self
352 .num_incoming_bundles
353 .checked_add(num_incoming_bundles)
354 .ok_or(ArithmeticError::Overflow)?;
355
356 self.num_operations = self
357 .num_operations
358 .checked_add(num_operations)
359 .ok_or(ArithmeticError::Overflow)?;
360
361 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
362 .map_err(|_| ArithmeticError::Overflow)?;
363 self.num_outgoing_messages = self
364 .num_outgoing_messages
365 .checked_add(num_outgoing_messages)
366 .ok_or(ArithmeticError::Overflow)?;
367
368 Ok(())
369 }
370}
371
372impl<C> ChainStateView<C>
373where
374 C: Context + Clone + 'static,
375 C::Extra: ExecutionRuntimeContext,
376{
377 pub fn chain_id(&self) -> ChainId {
379 self.context().extra().chain_id()
380 }
381
382 #[instrument(skip_all, fields(
383 chain_id = %self.chain_id(),
384 ))]
385 pub async fn query_application(
386 &mut self,
387 local_time: Timestamp,
388 query: Query,
389 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
390 ) -> Result<QueryOutcome, ChainError> {
391 let context = QueryContext {
392 chain_id: self.chain_id(),
393 next_block_height: self.tip_state.get().next_block_height,
394 local_time,
395 };
396 self.execution_state
397 .query_application(context, query, service_runtime_endpoint)
398 .await
399 .with_execution_context(ChainExecutionContext::Query)
400 }
401
402 #[instrument(skip_all, fields(
403 chain_id = %self.chain_id(),
404 application_id = %application_id
405 ))]
406 pub async fn describe_application(
407 &mut self,
408 application_id: ApplicationId,
409 ) -> Result<ApplicationDescription, ChainError> {
410 self.execution_state
411 .system
412 .describe_application(application_id, &mut TransactionTracker::default())
413 .await
414 .with_execution_context(ChainExecutionContext::DescribeApplication)
415 }
416
417 #[instrument(skip_all, fields(
418 chain_id = %self.chain_id(),
419 target = %target,
420 height = %height
421 ))]
422 pub async fn mark_messages_as_received(
423 &mut self,
424 target: &ChainId,
425 height: BlockHeight,
426 ) -> Result<bool, ChainError> {
427 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
428 let updates = outbox.mark_messages_as_received(height).await?;
429 if updates.is_empty() {
430 return Ok(false);
431 }
432 for update in updates {
433 let counter = self
434 .outbox_counters
435 .get_mut()
436 .get_mut(&update)
437 .ok_or_else(|| {
438 ChainError::InternalError("message counter should be present".into())
439 })?;
440 *counter = counter.checked_sub(1).ok_or(ArithmeticError::Underflow)?;
441 if *counter == 0 {
442 self.outbox_counters.get_mut().remove(&update);
444 }
445 }
446 if outbox.queue.count() == 0 {
447 self.nonempty_outboxes.get_mut().remove(target);
448 if *outbox.next_height_to_schedule.get() <= self.tip_state.get().next_block_height {
450 self.outboxes.remove_entry(target)?;
451 }
452 }
453 #[cfg(with_metrics)]
454 metrics::NUM_OUTBOXES
455 .with_label_values(&[])
456 .observe(self.outboxes.count().await? as f64);
457 Ok(true)
458 }
459
460 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
463 tracing::debug!(
464 "Messages left in {:.8}'s outbox: {:?}",
465 self.chain_id(),
466 self.outbox_counters.get()
467 );
468 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
469 key > &height
470 } else {
471 true
472 }
473 }
474
475 pub fn is_active(&self) -> bool {
477 self.execution_state.system.is_active()
478 }
479
480 pub async fn initialize_if_needed(&mut self, local_time: Timestamp) -> Result<(), ChainError> {
482 if self
484 .execution_state
485 .system
486 .initialize_chain(self.chain_id())
487 .await
488 .with_execution_context(ChainExecutionContext::Block)?
489 {
490 return Ok(());
492 }
493 let hash = self.execution_state.crypto_hash_mut().await?;
495 self.execution_state_hash.set(Some(hash));
496 let maybe_committee = self.execution_state.system.current_committee().into_iter();
497 self.manager.reset(
499 self.execution_state.system.ownership.get().clone(),
500 BlockHeight(0),
501 local_time,
502 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
503 )?;
504 Ok(())
505 }
506
507 pub async fn next_block_height_to_receive(
508 &self,
509 origin: &ChainId,
510 ) -> Result<BlockHeight, ChainError> {
511 let inbox = self.inboxes.try_load_entry(origin).await?;
512 match inbox {
513 Some(inbox) => inbox.next_block_height_to_receive(),
514 None => Ok(BlockHeight::ZERO),
515 }
516 }
517
518 pub async fn next_height_to_preprocess(&self) -> Result<BlockHeight, ChainError> {
522 if let Some(height) = self.preprocessed_blocks.indices().await?.last() {
523 return Ok(height.saturating_add(BlockHeight(1)));
524 }
525 Ok(self.tip_state.get().next_block_height)
526 }
527
528 pub async fn last_anticipated_block_height(
529 &self,
530 origin: &ChainId,
531 ) -> Result<Option<BlockHeight>, ChainError> {
532 let inbox = self.inboxes.try_load_entry(origin).await?;
533 match inbox {
534 Some(inbox) => match inbox.removed_bundles.back().await? {
535 Some(bundle) => Ok(Some(bundle.height)),
536 None => Ok(None),
537 },
538 None => Ok(None),
539 }
540 }
541
542 #[instrument(skip_all, fields(
549 chain_id = %self.chain_id(),
550 origin = %origin,
551 bundle_height = %bundle.height
552 ))]
553 pub async fn receive_message_bundle(
554 &mut self,
555 origin: &ChainId,
556 bundle: MessageBundle,
557 local_time: Timestamp,
558 add_to_received_log: bool,
559 ) -> Result<(), ChainError> {
560 assert!(!bundle.messages.is_empty());
561 let chain_id = self.chain_id();
562 tracing::trace!(
563 "Processing new messages to {chain_id:.8} from {origin} at height {}",
564 bundle.height,
565 );
566 let chain_and_height = ChainAndHeight {
567 chain_id: *origin,
568 height: bundle.height,
569 };
570
571 match self.initialize_if_needed(local_time).await {
572 Ok(_) => (),
573 Err(ChainError::ExecutionError(exec_err, _))
576 if matches!(*exec_err, ExecutionError::BlobsNotFound(ref blobs)
577 if blobs.iter().all(|blob_id| {
578 blob_id.blob_type == BlobType::ChainDescription && blob_id.hash == chain_id.0
579 })) => {}
580 err => {
581 return err;
582 }
583 }
584
585 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
587 #[cfg(with_metrics)]
588 metrics::NUM_INBOXES
589 .with_label_values(&[])
590 .observe(self.inboxes.count().await? as f64);
591 inbox
592 .add_bundle(bundle)
593 .await
594 .map_err(|error| match error {
595 InboxError::ViewError(error) => ChainError::ViewError(error),
596 error => ChainError::InternalError(format!(
597 "while processing messages in certified block: {error}"
598 )),
599 })?;
600
601 if add_to_received_log {
603 self.received_log.push(chain_and_height);
604 }
605 Ok(())
606 }
607
608 pub fn update_received_certificate_trackers(
610 &mut self,
611 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
612 ) {
613 for (name, tracker) in new_trackers {
614 self.received_certificate_trackers
615 .get_mut()
616 .entry(name)
617 .and_modify(|t| {
618 if tracker > *t {
621 *t = tracker;
622 }
623 })
624 .or_insert(tracker);
625 }
626 }
627
628 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
629 self.execution_state
630 .system
631 .current_committee()
632 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
633 }
634
635 pub fn ownership(&self) -> &ChainOwnership {
636 self.execution_state.system.ownership.get()
637 }
638
639 #[instrument(skip_all, fields(
645 chain_id = %self.chain_id(),
646 ))]
647 pub async fn remove_bundles_from_inboxes(
648 &mut self,
649 timestamp: Timestamp,
650 must_be_present: bool,
651 incoming_bundles: impl IntoIterator<Item = &IncomingBundle>,
652 ) -> Result<(), ChainError> {
653 let chain_id = self.chain_id();
654 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
655 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
656 ensure!(
657 bundle.timestamp <= timestamp,
658 ChainError::IncorrectBundleTimestamp {
659 chain_id,
660 bundle_timestamp: bundle.timestamp,
661 block_timestamp: timestamp,
662 }
663 );
664 let bundles = bundles_by_origin.entry(*origin).or_default();
665 bundles.push(bundle);
666 }
667 let origins = bundles_by_origin.keys().copied().collect::<Vec<_>>();
668 let inboxes = self.inboxes.try_load_entries_mut(&origins).await?;
669 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
670 tracing::trace!(
671 "Removing [{}] from {chain_id:.8}'s inbox for {origin:}",
672 bundles
673 .iter()
674 .map(|bundle| bundle.height.to_string())
675 .collect::<Vec<_>>()
676 .join(", ")
677 );
678 for bundle in bundles {
679 let was_present = inbox
681 .remove_bundle(bundle)
682 .await
683 .map_err(|error| (chain_id, origin, error))?;
684 if must_be_present {
685 ensure!(
686 was_present,
687 ChainError::MissingCrossChainUpdate {
688 chain_id,
689 origin,
690 height: bundle.height,
691 }
692 );
693 }
694 }
695 }
696 #[cfg(with_metrics)]
697 metrics::NUM_INBOXES
698 .with_label_values(&[])
699 .observe(self.inboxes.count().await? as f64);
700 Ok(())
701 }
702
703 pub fn nonempty_outbox_chain_ids(&self) -> Vec<ChainId> {
705 self.nonempty_outboxes.get().iter().copied().collect()
706 }
707
708 pub async fn load_outboxes(
710 &self,
711 targets: &[ChainId],
712 ) -> Result<Vec<ReadGuardedView<OutboxStateView<C>>>, ChainError> {
713 let vec_of_options = self.outboxes.try_load_entries(targets).await?;
714 let optional_vec = vec_of_options.into_iter().collect::<Option<Vec<_>>>();
715 optional_vec.ok_or_else(|| ChainError::InternalError("Missing outboxes".into()))
716 }
717
718 #[instrument(skip_all, fields(
721 chain_id = %block.chain_id,
722 block_height = %block.height
723 ))]
724 #[expect(clippy::too_many_arguments)]
725 async fn execute_block_inner(
726 chain: &mut ExecutionStateView<C>,
727 confirmed_log: &LogView<C, CryptoHash>,
728 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
729 previous_event_blocks_view: &MapView<C, StreamId, BlockHeight>,
730 block: &ProposedBlock,
731 local_time: Timestamp,
732 round: Option<u32>,
733 published_blobs: &[Blob],
734 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
735 ) -> Result<BlockExecutionOutcome, ChainError> {
736 #[cfg(with_metrics)]
737 let _execution_latency = metrics::BLOCK_EXECUTION_LATENCY.measure_latency();
738 chain.system.timestamp.set(block.timestamp);
739
740 let policy = chain
741 .system
742 .current_committee()
743 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?
744 .1
745 .policy()
746 .clone();
747
748 let mut resource_controller = ResourceController::new(
749 Arc::new(policy),
750 ResourceTracker::default(),
751 block.authenticated_signer,
752 );
753
754 for blob in published_blobs {
755 let blob_id = blob.id();
756 resource_controller
757 .policy()
758 .check_blob_size(blob.content())
759 .with_execution_context(ChainExecutionContext::Block)?;
760 chain.system.used_blobs.insert(&blob_id)?;
761 }
762
763 let mut block_execution_tracker = BlockExecutionTracker::new(
766 &mut resource_controller,
767 published_blobs
768 .iter()
769 .map(|blob| (blob.id(), blob))
770 .collect(),
771 local_time,
772 replaying_oracle_responses,
773 block,
774 )?;
775
776 for transaction in block.transaction_refs() {
777 block_execution_tracker
778 .execute_transaction(transaction, round, chain)
779 .await?;
780 }
781
782 let recipients = block_execution_tracker.recipients();
783 let heights = previous_message_blocks_view.multi_get(&recipients).await?;
784 let mut recipient_heights = Vec::new();
785 let mut indices = Vec::new();
786 for (height, recipient) in heights.into_iter().zip(recipients) {
787 if let Some(height) = height {
788 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
789 indices.push(index);
790 recipient_heights.push((recipient, height));
791 }
792 }
793 let hashes = confirmed_log.multi_get(indices).await?;
794 let mut previous_message_blocks = BTreeMap::new();
795 for (hash, (recipient, height)) in hashes.into_iter().zip(recipient_heights) {
796 let hash = hash.ok_or_else(|| {
797 ChainError::InternalError("missing entry in confirmed_log".into())
798 })?;
799 previous_message_blocks.insert(recipient, (hash, height));
800 }
801
802 let streams = block_execution_tracker.event_streams();
803 let heights = previous_event_blocks_view.multi_get(&streams).await?;
804 let mut stream_heights = Vec::new();
805 let mut indices = Vec::new();
806 for (stream, height) in streams.into_iter().zip(heights) {
807 if let Some(height) = height {
808 let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
809 indices.push(index);
810 stream_heights.push((stream, height));
811 }
812 }
813 let hashes = confirmed_log.multi_get(indices).await?;
814 let mut previous_event_blocks = BTreeMap::new();
815 for (hash, (stream, height)) in hashes.into_iter().zip(stream_heights) {
816 let hash = hash.ok_or_else(|| {
817 ChainError::InternalError("missing entry in confirmed_log".into())
818 })?;
819 previous_event_blocks.insert(stream, (hash, height));
820 }
821
822 let state_hash = {
823 #[cfg(with_metrics)]
824 let _hash_latency = metrics::STATE_HASH_COMPUTATION_LATENCY.measure_latency();
825 chain.crypto_hash_mut().await?
826 };
827
828 let (messages, oracle_responses, events, blobs, operation_results) =
829 block_execution_tracker.finalize();
830
831 Ok(BlockExecutionOutcome {
832 messages,
833 previous_message_blocks,
834 previous_event_blocks,
835 state_hash,
836 oracle_responses,
837 events,
838 blobs,
839 operation_results,
840 })
841 }
842
843 #[instrument(skip_all, fields(
846 chain_id = %self.chain_id(),
847 block_height = %block.height
848 ))]
849 pub async fn execute_block(
850 &mut self,
851 block: &ProposedBlock,
852 local_time: Timestamp,
853 round: Option<u32>,
854 published_blobs: &[Blob],
855 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
856 ) -> Result<BlockExecutionOutcome, ChainError> {
857 assert_eq!(
858 block.chain_id,
859 self.execution_state.context().extra().chain_id()
860 );
861
862 self.initialize_if_needed(local_time).await?;
863
864 let chain_timestamp = *self.execution_state.system.timestamp.get();
865 ensure!(
866 chain_timestamp <= block.timestamp,
867 ChainError::InvalidBlockTimestamp {
868 parent: chain_timestamp,
869 new: block.timestamp
870 }
871 );
872 ensure!(!block.transactions.is_empty(), ChainError::EmptyBlock);
873
874 ensure!(
875 block.published_blob_ids()
876 == published_blobs
877 .iter()
878 .map(|blob| blob.id())
879 .collect::<BTreeSet<_>>(),
880 ChainError::InternalError("published_blobs mismatch".to_string())
881 );
882
883 if *self.execution_state.system.closed.get() {
884 ensure!(block.has_only_rejected_messages(), ChainError::ClosedChain);
885 }
886
887 Self::check_app_permissions(
888 self.execution_state.system.application_permissions.get(),
889 block,
890 )?;
891
892 Self::execute_block_inner(
893 &mut self.execution_state,
894 &self.confirmed_log,
895 &self.previous_message_blocks,
896 &self.previous_event_blocks,
897 block,
898 local_time,
899 round,
900 published_blobs,
901 replaying_oracle_responses,
902 )
903 .await
904 }
905
906 #[instrument(skip_all, fields(
910 chain_id = %self.chain_id(),
911 block_height = %block.inner().inner().header.height
912 ))]
913 pub async fn apply_confirmed_block(
914 &mut self,
915 block: &ConfirmedBlock,
916 local_time: Timestamp,
917 ) -> Result<(), ChainError> {
918 let hash = block.inner().hash();
919 let block = block.inner().inner();
920 self.execution_state_hash.set(Some(block.header.state_hash));
921 let recipients = self.process_outgoing_messages(block).await?;
922
923 for recipient in recipients {
924 self.previous_message_blocks
925 .insert(&recipient, block.header.height)?;
926 }
927 for event in block.body.events.iter().flatten() {
928 self.previous_event_blocks
929 .insert(&event.stream_id, block.header.height)?;
930 }
931 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
933
934 let tip = self.tip_state.get_mut();
936 tip.block_hash = Some(hash);
937 tip.next_block_height.try_add_assign_one()?;
938 tip.update_counters(&block.body.transactions, &block.body.messages)?;
939 self.confirmed_log.push(hash);
940 self.preprocessed_blocks.remove(&block.header.height)?;
941 Ok(())
942 }
943
944 #[instrument(skip_all, fields(
946 chain_id = %self.chain_id(),
947 block_height = %block.inner().inner().header.height
948 ))]
949 pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> {
950 let hash = block.inner().hash();
951 let block = block.inner().inner();
952 let height = block.header.height;
953 if height < self.tip_state.get().next_block_height {
954 return Ok(());
955 }
956 self.process_outgoing_messages(block).await?;
957 self.preprocessed_blocks.insert(&height, hash)?;
958 Ok(())
959 }
960
961 pub fn is_child(&self) -> bool {
963 let Some(description) = self.execution_state.system.description.get() else {
964 return true;
966 };
967 description.is_child()
968 }
969
970 #[instrument(skip_all, fields(
972 block_height = %block.height,
973 num_transactions = %block.transactions.len()
974 ))]
975 fn check_app_permissions(
976 app_permissions: &ApplicationPermissions,
977 block: &ProposedBlock,
978 ) -> Result<(), ChainError> {
979 let mut mandatory = HashSet::<ApplicationId>::from_iter(
980 app_permissions.mandatory_applications.iter().copied(),
981 );
982 for transaction in &block.transactions {
983 match transaction {
984 Transaction::ExecuteOperation(operation)
985 if operation.is_exempt_from_permissions() =>
986 {
987 mandatory.clear()
988 }
989 Transaction::ExecuteOperation(operation) => {
990 ensure!(
991 app_permissions.can_execute_operations(&operation.application_id()),
992 ChainError::AuthorizedApplications(
993 app_permissions.execute_operations.clone().unwrap()
994 )
995 );
996 if let Operation::User { application_id, .. } = operation {
997 mandatory.remove(application_id);
998 }
999 }
1000 Transaction::ReceiveMessages(incoming_bundle) => {
1001 for pending in incoming_bundle.messages() {
1002 if let Message::User { application_id, .. } = &pending.message {
1003 mandatory.remove(application_id);
1004 }
1005 }
1006 }
1007 }
1008 }
1009 ensure!(
1010 mandatory.is_empty(),
1011 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1012 );
1013 Ok(())
1014 }
1015
1016 #[instrument(skip_all, fields(
1021 chain_id = %self.chain_id(),
1022 next_block_height = %self.tip_state.get().next_block_height,
1023 ))]
1024 pub async fn block_hashes(
1025 &self,
1026 heights: impl IntoIterator<Item = BlockHeight>,
1027 ) -> Result<Vec<CryptoHash>, ChainError> {
1028 let next_height = self.tip_state.get().next_block_height;
1029 let (confirmed_heights, unconfirmed_heights) = heights
1031 .into_iter()
1032 .partition::<Vec<_>, _>(|height| *height < next_height);
1033 let confirmed_indices = confirmed_heights
1034 .into_iter()
1035 .map(|height| usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow))
1036 .collect::<Result<_, _>>()?;
1037 let confirmed_hashes = self.confirmed_log.multi_get(confirmed_indices).await?;
1038 let unconfirmed_hashes = self
1040 .preprocessed_blocks
1041 .multi_get(&unconfirmed_heights)
1042 .await?;
1043 Ok(confirmed_hashes
1044 .into_iter()
1045 .chain(unconfirmed_hashes)
1046 .flatten()
1047 .collect())
1048 }
1049
1050 fn reset_chain_manager(
1052 &mut self,
1053 next_height: BlockHeight,
1054 local_time: Timestamp,
1055 ) -> Result<(), ChainError> {
1056 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1057 let ownership = self.execution_state.system.ownership.get().clone();
1058 let fallback_owners =
1059 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1060 self.pending_validated_blobs.clear();
1061 self.pending_proposed_blobs.clear();
1062 self.manager
1063 .reset(ownership, next_height, local_time, fallback_owners)
1064 }
1065
1066 #[instrument(skip_all, fields(
1070 chain_id = %self.chain_id(),
1071 block_height = %block.header.height
1072 ))]
1073 async fn process_outgoing_messages(
1074 &mut self,
1075 block: &Block,
1076 ) -> Result<Vec<ChainId>, ChainError> {
1077 let recipients = block.recipients();
1080 let block_height = block.header.height;
1081 let next_height = self.tip_state.get().next_block_height;
1082
1083 let outbox_counters = self.outbox_counters.get_mut();
1085 let nonempty_outboxes = self.nonempty_outboxes.get_mut();
1086 let targets = recipients.into_iter().collect::<Vec<_>>();
1087 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1088 for (mut outbox, target) in outboxes.into_iter().zip(&targets) {
1089 if block_height > next_height {
1090 if *outbox.next_height_to_schedule.get() > block_height {
1093 continue; }
1095 let maybe_prev_hash = match outbox.next_height_to_schedule.get().try_sub_one().ok()
1096 {
1097 Some(height) if height < next_height => {
1100 let index =
1101 usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
1102 Some(self.confirmed_log.get(index).await?.ok_or_else(|| {
1103 ChainError::InternalError("missing entry in confirmed_log".into())
1104 })?)
1105 }
1106 Some(height) => Some(self.preprocessed_blocks.get(&height).await?.ok_or_else(
1109 || ChainError::InternalError("missing entry in preprocessed_blocks".into()),
1110 )?),
1111 None => None, };
1113 match (
1115 maybe_prev_hash,
1116 block.body.previous_message_blocks.get(target),
1117 ) {
1118 (None, None) => {
1119 }
1122 (Some(_), None) => {
1123 return Err(ChainError::InternalError(
1126 "block indicates no previous message block,\
1127 but we have one in the outbox"
1128 .into(),
1129 ));
1130 }
1131 (None, Some((_, prev_msg_block_height))) => {
1132 if *prev_msg_block_height >= next_height {
1137 continue;
1138 }
1139 }
1140 (Some(ref prev_hash), Some((prev_msg_block_hash, _))) => {
1141 if prev_hash != prev_msg_block_hash {
1143 continue;
1144 }
1145 }
1146 }
1147 }
1148 if outbox.schedule_message(block_height)? {
1149 *outbox_counters.entry(block_height).or_default() += 1;
1150 nonempty_outboxes.insert(*target);
1151 }
1152 }
1153
1154 #[cfg(with_metrics)]
1155 metrics::NUM_OUTBOXES
1156 .with_label_values(&[])
1157 .observe(self.outboxes.count().await? as f64);
1158 Ok(targets)
1159 }
1160}
1161
1162#[test]
1163fn empty_block_size() {
1164 let size = bcs::serialized_size(&crate::block::Block::new(
1165 crate::test::make_first_block(
1166 linera_execution::test_utils::dummy_chain_description(0).id(),
1167 ),
1168 crate::data_types::BlockExecutionOutcome::default(),
1169 ))
1170 .unwrap();
1171 assert_eq!(size, EMPTY_BLOCK_SIZE);
1172}