1#[cfg(with_metrics)]
5use std::sync::LazyLock;
6use std::{
7 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
8 sync::Arc,
9};
10
11use async_graphql::SimpleObject;
12use futures::stream::{self, StreamExt, TryStreamExt};
13use linera_base::{
14 crypto::{CryptoHash, ValidatorPublicKey},
15 data_types::{
16 Amount, ApplicationDescription, ApplicationPermissions, ArithmeticError, Blob, BlockHeight,
17 OracleResponse, Timestamp,
18 },
19 ensure,
20 identifiers::{
21 AccountOwner, ApplicationId, BlobType, ChainId, ChannelFullName, Destination,
22 GenericApplicationId, MessageId,
23 },
24 ownership::ChainOwnership,
25};
26use linera_execution::{
27 committee::{Committee, Epoch},
28 system::OpenChainConfig,
29 ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation,
30 OperationContext, OutgoingMessage, Query, QueryContext, QueryOutcome, ResourceController,
31 ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
32};
33use linera_views::{
34 context::Context,
35 log_view::LogView,
36 map_view::MapView,
37 queue_view::QueueView,
38 reentrant_collection_view::ReentrantCollectionView,
39 register_view::RegisterView,
40 set_view::SetView,
41 views::{ClonableView, CryptoHashView, RootView, View},
42};
43use serde::{Deserialize, Serialize};
44
45use crate::{
46 block::{Block, ConfirmedBlock},
47 data_types::{
48 BlockExecutionOutcome, ChainAndHeight, IncomingBundle, MessageAction, MessageBundle,
49 OperationResult, Origin, PostedMessage, ProposedBlock, Target, Transaction,
50 },
51 inbox::{Cursor, InboxError, InboxStateView},
52 manager::ChainManager,
53 outbox::OutboxStateView,
54 pending_blobs::PendingBlobsView,
55 ChainError, ChainExecutionContext, ExecutionResultExt,
56};
57
58#[cfg(test)]
59#[path = "unit_tests/chain_tests.rs"]
60mod chain_tests;
61
62#[cfg(with_metrics)]
63use {
64 linera_base::prometheus_util::{
65 exponential_bucket_interval, exponential_bucket_latencies, register_histogram_vec,
66 register_int_counter_vec, MeasureLatency,
67 },
68 prometheus::{HistogramVec, IntCounterVec},
69};
70
71#[cfg(with_metrics)]
72static NUM_BLOCKS_EXECUTED: LazyLock<IntCounterVec> = LazyLock::new(|| {
73 register_int_counter_vec("num_blocks_executed", "Number of blocks executed", &[])
74});
75
76#[cfg(with_metrics)]
77static BLOCK_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
78 register_histogram_vec(
79 "block_execution_latency",
80 "Block execution latency",
81 &[],
82 exponential_bucket_latencies(1000.0),
83 )
84});
85
86#[cfg(with_metrics)]
87static MESSAGE_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
88 register_histogram_vec(
89 "message_execution_latency",
90 "Message execution latency",
91 &[],
92 exponential_bucket_latencies(50.0),
93 )
94});
95
96#[cfg(with_metrics)]
97static OPERATION_EXECUTION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
98 register_histogram_vec(
99 "operation_execution_latency",
100 "Operation execution latency",
101 &[],
102 exponential_bucket_latencies(50.0),
103 )
104});
105
106#[cfg(with_metrics)]
107static WASM_FUEL_USED_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
108 register_histogram_vec(
109 "wasm_fuel_used_per_block",
110 "Wasm fuel used per block",
111 &[],
112 exponential_bucket_interval(10.0, 1_000_000.0),
113 )
114});
115
116#[cfg(with_metrics)]
117static WASM_NUM_READS_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
118 register_histogram_vec(
119 "wasm_num_reads_per_block",
120 "Wasm number of reads per block",
121 &[],
122 exponential_bucket_interval(0.1, 100.0),
123 )
124});
125
126#[cfg(with_metrics)]
127static WASM_BYTES_READ_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
128 register_histogram_vec(
129 "wasm_bytes_read_per_block",
130 "Wasm number of bytes read per block",
131 &[],
132 exponential_bucket_interval(0.1, 10_000_000.0),
133 )
134});
135
136#[cfg(with_metrics)]
137static WASM_BYTES_WRITTEN_PER_BLOCK: LazyLock<HistogramVec> = LazyLock::new(|| {
138 register_histogram_vec(
139 "wasm_bytes_written_per_block",
140 "Wasm number of bytes written per block",
141 &[],
142 exponential_bucket_interval(0.1, 10_000_000.0),
143 )
144});
145
146#[cfg(with_metrics)]
147static STATE_HASH_COMPUTATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
148 register_histogram_vec(
149 "state_hash_computation_latency",
150 "Time to recompute the state hash",
151 &[],
152 exponential_bucket_latencies(10.0),
153 )
154});
155
156#[cfg(with_metrics)]
157static NUM_INBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
158 register_histogram_vec(
159 "num_inboxes",
160 "Number of inboxes",
161 &[],
162 exponential_bucket_interval(1.0, 10_000.0),
163 )
164});
165
166#[cfg(with_metrics)]
167static NUM_OUTBOXES: LazyLock<HistogramVec> = LazyLock::new(|| {
168 register_histogram_vec(
169 "num_outboxes",
170 "Number of outboxes",
171 &[],
172 exponential_bucket_interval(1.0, 10_000.0),
173 )
174});
175
176const EMPTY_BLOCK_SIZE: usize = 94;
178
179#[derive(Debug, Clone, Serialize, Deserialize, async_graphql::SimpleObject)]
181pub struct TimestampedBundleInInbox {
182 pub entry: BundleInInbox,
184 pub seen: Timestamp,
186}
187
188#[derive(
190 Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, async_graphql::SimpleObject,
191)]
192pub struct BundleInInbox {
193 pub origin: Origin,
195 pub cursor: Cursor,
197}
198
199impl BundleInInbox {
200 fn new(origin: Origin, bundle: &MessageBundle) -> Self {
201 BundleInInbox {
202 cursor: Cursor::from(bundle),
203 origin,
204 }
205 }
206}
207
208#[derive(Debug, RootView, ClonableView, SimpleObject)]
210#[graphql(cache_control(no_cache))]
211pub struct ChainStateView<C>
212where
213 C: Clone + Context + Send + Sync + 'static,
214{
215 pub execution_state: ExecutionStateView<C>,
217 pub execution_state_hash: RegisterView<C, Option<CryptoHash>>,
219
220 pub tip_state: RegisterView<C, ChainTipState>,
222
223 pub manager: ChainManager<C>,
225 pub pending_validated_blobs: PendingBlobsView<C>,
228 pub pending_proposed_blobs: ReentrantCollectionView<C, AccountOwner, PendingBlobsView<C>>,
230
231 pub confirmed_log: LogView<C, CryptoHash>,
234 pub received_log: LogView<C, ChainAndHeight>,
236 pub received_certificate_trackers: RegisterView<C, HashMap<ValidatorPublicKey, u64>>,
238
239 pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
241 pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
243 pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
245 pub previous_message_blocks: MapView<C, ChainId, BlockHeight>,
247 pub outboxes: ReentrantCollectionView<C, Target, OutboxStateView<C>>,
249 pub outbox_counters: RegisterView<C, BTreeMap<BlockHeight, u32>>,
252 pub channels: ReentrantCollectionView<C, ChannelFullName, ChannelStateView<C>>,
254}
255
256#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, SimpleObject)]
258pub struct ChainTipState {
259 pub block_hash: Option<CryptoHash>,
261 pub next_block_height: BlockHeight,
263 pub num_incoming_bundles: u32,
265 pub num_operations: u32,
267 pub num_outgoing_messages: u32,
269}
270
271impl ChainTipState {
272 pub fn verify_block_chaining(&self, new_block: &ProposedBlock) -> Result<(), ChainError> {
275 ensure!(
276 new_block.height == self.next_block_height,
277 ChainError::UnexpectedBlockHeight {
278 expected_block_height: self.next_block_height,
279 found_block_height: new_block.height
280 }
281 );
282 ensure!(
283 new_block.previous_block_hash == self.block_hash,
284 ChainError::UnexpectedPreviousBlockHash
285 );
286 Ok(())
287 }
288
289 pub fn already_validated_block(&self, height: BlockHeight) -> Result<bool, ChainError> {
292 ensure!(
293 self.next_block_height >= height,
294 ChainError::MissingEarlierBlocks {
295 current_block_height: self.next_block_height,
296 }
297 );
298 Ok(self.next_block_height > height)
299 }
300
301 pub fn is_first_block(&self) -> bool {
303 self.next_block_height == BlockHeight::ZERO
304 }
305
306 pub fn update_counters(
308 &mut self,
309 incoming_bundles: &[IncomingBundle],
310 operations: &[Operation],
311 messages: &[Vec<OutgoingMessage>],
312 ) -> Result<(), ChainError> {
313 let num_incoming_bundles =
314 u32::try_from(incoming_bundles.len()).map_err(|_| ArithmeticError::Overflow)?;
315 self.num_incoming_bundles = self
316 .num_incoming_bundles
317 .checked_add(num_incoming_bundles)
318 .ok_or(ArithmeticError::Overflow)?;
319
320 let num_operations =
321 u32::try_from(operations.len()).map_err(|_| ArithmeticError::Overflow)?;
322 self.num_operations = self
323 .num_operations
324 .checked_add(num_operations)
325 .ok_or(ArithmeticError::Overflow)?;
326
327 let num_outgoing_messages = u32::try_from(messages.iter().map(Vec::len).sum::<usize>())
328 .map_err(|_| ArithmeticError::Overflow)?;
329 self.num_outgoing_messages = self
330 .num_outgoing_messages
331 .checked_add(num_outgoing_messages)
332 .ok_or(ArithmeticError::Overflow)?;
333
334 Ok(())
335 }
336}
337
338#[derive(Debug, ClonableView, View, SimpleObject)]
340pub struct ChannelStateView<C>
341where
342 C: Context + Send + Sync,
343{
344 pub subscribers: SetView<C, ChainId>,
346 pub block_heights: LogView<C, BlockHeight>,
348}
349
350impl<C> ChainStateView<C>
351where
352 C: Context + Clone + Send + Sync + 'static,
353 C::Extra: ExecutionRuntimeContext,
354{
355 pub fn chain_id(&self) -> ChainId {
357 self.context().extra().chain_id()
358 }
359
360 pub async fn query_application(
361 &mut self,
362 local_time: Timestamp,
363 query: Query,
364 service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>,
365 ) -> Result<QueryOutcome, ChainError> {
366 let context = QueryContext {
367 chain_id: self.chain_id(),
368 next_block_height: self.tip_state.get().next_block_height,
369 local_time,
370 };
371 self.execution_state
372 .query_application(context, query, service_runtime_endpoint)
373 .await
374 .with_execution_context(ChainExecutionContext::Query)
375 }
376
377 pub async fn describe_application(
378 &mut self,
379 application_id: ApplicationId,
380 ) -> Result<ApplicationDescription, ChainError> {
381 self.execution_state
382 .system
383 .describe_application(application_id, None)
384 .await
385 .with_execution_context(ChainExecutionContext::DescribeApplication)
386 }
387
388 pub async fn mark_messages_as_received(
389 &mut self,
390 target: &Target,
391 height: BlockHeight,
392 ) -> Result<bool, ChainError> {
393 let mut outbox = self.outboxes.try_load_entry_mut(target).await?;
394 let updates = outbox.mark_messages_as_received(height).await?;
395 if updates.is_empty() {
396 return Ok(false);
397 }
398 for update in updates {
399 let counter = self
400 .outbox_counters
401 .get_mut()
402 .get_mut(&update)
403 .expect("message counter should be present");
404 *counter = counter
405 .checked_sub(1)
406 .expect("message counter should not underflow");
407 if *counter == 0 {
408 self.outbox_counters.get_mut().remove(&update);
410 }
411 }
412 if outbox.queue.count() == 0 {
413 self.outboxes.remove_entry(target)?;
414 }
415 #[cfg(with_metrics)]
416 NUM_OUTBOXES
417 .with_label_values(&[])
418 .observe(self.outboxes.count().await? as f64);
419 Ok(true)
420 }
421
422 pub fn all_messages_delivered_up_to(&self, height: BlockHeight) -> bool {
425 tracing::debug!(
426 "Messages left in {:.8}'s outbox: {:?}",
427 self.chain_id(),
428 self.outbox_counters.get()
429 );
430 if let Some((key, _)) = self.outbox_counters.get().first_key_value() {
431 key > &height
432 } else {
433 true
434 }
435 }
436
437 pub fn is_active(&self) -> bool {
439 self.execution_state.system.is_active()
440 }
441
442 pub fn ensure_is_active(&self) -> Result<(), ChainError> {
444 if self.is_active() {
445 Ok(())
446 } else {
447 Err(ChainError::InactiveChain(self.chain_id()))
448 }
449 }
450
451 pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
454 let chain_id = self.chain_id();
455 let pairs = self.inboxes.try_load_all_entries().await?;
456 let max_stream_queries = self.context().max_stream_queries();
457 let stream = stream::iter(pairs)
458 .map(|(origin, inbox)| async move {
459 if let Some(bundle) = inbox.removed_bundles.front().await? {
460 return Err(ChainError::MissingCrossChainUpdate {
461 chain_id,
462 origin: origin.into(),
463 height: bundle.height,
464 });
465 }
466 Ok::<(), ChainError>(())
467 })
468 .buffer_unordered(max_stream_queries);
469 stream.try_collect::<Vec<_>>().await?;
470 Ok(())
471 }
472
473 pub async fn next_block_height_to_receive(
474 &self,
475 origin: &Origin,
476 ) -> Result<BlockHeight, ChainError> {
477 let inbox = self.inboxes.try_load_entry(origin).await?;
478 match inbox {
479 Some(inbox) => inbox.next_block_height_to_receive(),
480 None => Ok(BlockHeight::from(0)),
481 }
482 }
483
484 pub async fn last_anticipated_block_height(
485 &self,
486 origin: &Origin,
487 ) -> Result<Option<BlockHeight>, ChainError> {
488 let inbox = self.inboxes.try_load_entry(origin).await?;
489 match inbox {
490 Some(inbox) => match inbox.removed_bundles.back().await? {
491 Some(bundle) => Ok(Some(bundle.height)),
492 None => Ok(None),
493 },
494 None => Ok(None),
495 }
496 }
497
498 pub async fn receive_message_bundle(
505 &mut self,
506 origin: &Origin,
507 bundle: MessageBundle,
508 local_time: Timestamp,
509 add_to_received_log: bool,
510 ) -> Result<(), ChainError> {
511 assert!(!bundle.messages.is_empty());
512 let chain_id = self.chain_id();
513 tracing::trace!(
514 "Processing new messages to {chain_id:.8} from {origin} at height {}",
515 bundle.height,
516 );
517 let chain_and_height = ChainAndHeight {
518 chain_id: origin.sender,
519 height: bundle.height,
520 };
521
522 for posted_message in &bundle.messages {
524 if let Some(config) = posted_message.message.matches_open_chain() {
525 if self.execution_state.system.description.get().is_none() {
526 let message_id = chain_and_height.to_message_id(posted_message.index);
527 self.execute_init_message(message_id, config, bundle.timestamp, local_time)
528 .await?;
529 }
530 }
531 }
532 let mut inbox = self.inboxes.try_load_entry_mut(origin).await?;
534 #[cfg(with_metrics)]
535 NUM_INBOXES
536 .with_label_values(&[])
537 .observe(self.inboxes.count().await? as f64);
538 let entry = BundleInInbox::new(origin.clone(), &bundle);
539 let skippable = bundle.is_skippable();
540 let newly_added = inbox
541 .add_bundle(bundle)
542 .await
543 .map_err(|error| match error {
544 InboxError::ViewError(error) => ChainError::ViewError(error),
545 error => ChainError::InternalError(format!(
546 "while processing messages in certified block: {error}"
547 )),
548 })?;
549 if newly_added && !skippable {
550 let seen = local_time;
551 self.unskippable_bundles
552 .push_back(TimestampedBundleInInbox { entry, seen });
553 }
554
555 if add_to_received_log {
557 self.received_log.push(chain_and_height);
558 }
559 Ok(())
560 }
561
562 pub fn update_received_certificate_trackers(
564 &mut self,
565 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
566 ) {
567 for (name, tracker) in new_trackers {
568 self.received_certificate_trackers
569 .get_mut()
570 .entry(name)
571 .and_modify(|t| {
572 if tracker > *t {
575 *t = tracker;
576 }
577 })
578 .or_insert(tracker);
579 }
580 }
581
582 pub async fn execute_init_message_from(
584 &mut self,
585 block: &Block,
586 local_time: Timestamp,
587 ) -> Result<(), ChainError> {
588 let (in_bundle, posted_message, config) = block
589 .starts_with_open_chain_message()
590 .ok_or_else(|| ChainError::InactiveChain(block.header.chain_id))?;
591 if self.is_active() {
592 return Ok(()); }
594 let message_id = MessageId {
595 chain_id: in_bundle.origin.sender,
596 height: in_bundle.bundle.height,
597 index: posted_message.index,
598 };
599 self.execute_init_message(message_id, config, block.header.timestamp, local_time)
600 .await
601 }
602
603 async fn execute_init_message(
605 &mut self,
606 message_id: MessageId,
607 config: &OpenChainConfig,
608 timestamp: Timestamp,
609 local_time: Timestamp,
610 ) -> Result<(), ChainError> {
611 self.execution_state
613 .system
614 .initialize_chain(message_id, timestamp, config.clone());
615 let hash = self.execution_state.crypto_hash().await?;
617 self.execution_state_hash.set(Some(hash));
618 let maybe_committee = self.execution_state.system.current_committee().into_iter();
619 self.manager.reset(
621 self.execution_state.system.ownership.get().clone(),
622 BlockHeight(0),
623 local_time,
624 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights()),
625 )
626 }
627
628 pub fn current_committee(&self) -> Result<(Epoch, &Committee), ChainError> {
629 self.execution_state
630 .system
631 .current_committee()
632 .ok_or_else(|| ChainError::InactiveChain(self.chain_id()))
633 }
634
635 pub fn ownership(&self) -> &ChainOwnership {
636 self.execution_state.system.ownership.get()
637 }
638
639 pub async fn remove_bundles_from_inboxes(
641 &mut self,
642 timestamp: Timestamp,
643 incoming_bundles: &[IncomingBundle],
644 ) -> Result<(), ChainError> {
645 let chain_id = self.chain_id();
646 let mut bundles_by_origin: BTreeMap<_, Vec<&MessageBundle>> = Default::default();
647 for IncomingBundle { bundle, origin, .. } in incoming_bundles {
648 ensure!(
649 bundle.timestamp <= timestamp,
650 ChainError::IncorrectBundleTimestamp {
651 chain_id,
652 bundle_timestamp: bundle.timestamp,
653 block_timestamp: timestamp,
654 }
655 );
656 let bundles = bundles_by_origin.entry(origin).or_default();
657 bundles.push(bundle);
658 }
659 let origins = bundles_by_origin.keys().copied();
660 let inboxes = self.inboxes.try_load_entries_mut(origins).await?;
661 let mut removed_unskippable = HashSet::new();
662 for ((origin, bundles), mut inbox) in bundles_by_origin.into_iter().zip(inboxes) {
663 tracing::trace!(
664 "Removing {:?} from {chain_id:.8}'s inbox for {origin:}",
665 bundles
666 .iter()
667 .map(|bundle| bundle.height)
668 .collect::<Vec<_>>()
669 );
670 for bundle in bundles {
671 let was_present = inbox
673 .remove_bundle(bundle)
674 .await
675 .map_err(|error| ChainError::from((chain_id, origin.clone(), error)))?;
676 if was_present && !bundle.is_skippable() {
677 removed_unskippable.insert(BundleInInbox::new(origin.clone(), bundle));
678 }
679 }
680 }
681 if !removed_unskippable.is_empty() {
682 let maybe_front = self.unskippable_bundles.front().await?;
684 if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
685 self.unskippable_bundles.delete_front();
686 while let Some(ts_entry) = self.unskippable_bundles.front().await? {
687 if !removed_unskippable.remove(&ts_entry.entry) {
688 if !self
689 .removed_unskippable_bundles
690 .contains(&ts_entry.entry)
691 .await?
692 {
693 break;
694 }
695 self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
696 }
697 self.unskippable_bundles.delete_front();
698 }
699 }
700 for entry in removed_unskippable {
701 self.removed_unskippable_bundles.insert(&entry)?;
702 }
703 }
704 #[cfg(with_metrics)]
705 NUM_INBOXES
706 .with_label_values(&[])
707 .observe(self.inboxes.count().await? as f64);
708 Ok(())
709 }
710
711 #[expect(clippy::too_many_arguments)]
714 pub async fn execute_block_inner(
715 chain: &mut ExecutionStateView<C>,
716 confirmed_log: &LogView<C, CryptoHash>,
717 previous_message_blocks_view: &MapView<C, ChainId, BlockHeight>,
718 block: &ProposedBlock,
719 local_time: Timestamp,
720 round: Option<u32>,
721 published_blobs: &[Blob],
722 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
723 ) -> Result<
724 (
725 BlockExecutionOutcome,
726 Vec<(ChannelFullName, ChainId)>,
727 Vec<(ChannelFullName, ChainId)>,
728 ),
729 ChainError,
730 > {
731 #[cfg(with_metrics)]
732 let _execution_latency = BLOCK_EXECUTION_LATENCY.measure_latency();
733
734 assert_eq!(block.chain_id, chain.context().extra().chain_id());
735
736 ensure!(
737 *chain.system.timestamp.get() <= block.timestamp,
738 ChainError::InvalidBlockTimestamp
739 );
740 chain.system.timestamp.set(block.timestamp);
741 let (_, committee) = chain
742 .system
743 .current_committee()
744 .ok_or_else(|| ChainError::InactiveChain(block.chain_id))?;
745 let mut resource_controller = ResourceController {
746 policy: Arc::new(committee.policy().clone()),
747 tracker: ResourceTracker::default(),
748 account: block.authenticated_signer,
749 };
750 ensure!(
751 block.published_blob_ids()
752 == published_blobs
753 .iter()
754 .map(|blob| blob.id())
755 .collect::<BTreeSet<_>>(),
756 ChainError::InternalError("published_blobs mismatch".to_string())
757 );
758 resource_controller
759 .track_block_size(EMPTY_BLOCK_SIZE)
760 .with_execution_context(ChainExecutionContext::Block)?;
761 for blob in published_blobs {
762 let blob_type = blob.content().blob_type();
763 if blob_type == BlobType::Data
764 || blob_type == BlobType::ContractBytecode
765 || blob_type == BlobType::ServiceBytecode
766 || blob_type == BlobType::EvmBytecode
767 {
768 resource_controller
769 .with_state(&mut chain.system)
770 .await?
771 .track_blob_published(blob.content())
772 .with_execution_context(ChainExecutionContext::Block)?;
773 }
774 chain.system.used_blobs.insert(&blob.id())?;
775 }
776
777 if *chain.system.closed.get() {
778 ensure!(
779 !block.incoming_bundles.is_empty() && block.has_only_rejected_messages(),
780 ChainError::ClosedChain
781 );
782 }
783 Self::check_app_permissions(chain.system.application_permissions.get(), block)?;
784
785 let mut replaying_oracle_responses = replaying_oracle_responses.map(Vec::into_iter);
788 let mut next_message_index = 0;
789 let mut next_application_index = 0;
790 let mut oracle_responses = Vec::new();
791 let mut events = Vec::new();
792 let mut blobs = Vec::new();
793 let mut messages = Vec::new();
794 let mut operation_results = Vec::new();
795 let mut subscribe = Vec::new();
796 let mut unsubscribe = Vec::new();
797 for (txn_index, transaction) in block.transactions() {
798 let chain_execution_context = match transaction {
799 Transaction::ReceiveMessages(_) => ChainExecutionContext::IncomingBundle(txn_index),
800 Transaction::ExecuteOperation(_) => ChainExecutionContext::Operation(txn_index),
801 };
802 let maybe_responses = match replaying_oracle_responses.as_mut().map(Iterator::next) {
803 Some(Some(responses)) => Some(responses),
804 Some(None) => return Err(ChainError::MissingOracleResponseList),
805 None => None,
806 };
807 let mut txn_tracker = TransactionTracker::new(
808 local_time,
809 txn_index,
810 next_message_index,
811 next_application_index,
812 maybe_responses,
813 );
814 match transaction {
815 Transaction::ReceiveMessages(incoming_bundle) => {
816 resource_controller
817 .track_block_size_of(&incoming_bundle)
818 .with_execution_context(chain_execution_context)?;
819 for (message_id, posted_message) in incoming_bundle.messages_and_ids() {
820 Box::pin(Self::execute_message_in_block(
821 chain,
822 message_id,
823 posted_message,
824 incoming_bundle,
825 block,
826 round,
827 &mut txn_tracker,
828 &mut resource_controller,
829 ))
830 .await?;
831 }
832 }
833 Transaction::ExecuteOperation(operation) => {
834 resource_controller
835 .track_block_size_of(&operation)
836 .with_execution_context(chain_execution_context)?;
837 #[cfg(with_metrics)]
838 let _operation_latency = OPERATION_EXECUTION_LATENCY.measure_latency();
839 let context = OperationContext {
840 chain_id: block.chain_id,
841 height: block.height,
842 index: Some(txn_index),
843 round,
844 authenticated_signer: block.authenticated_signer,
845 authenticated_caller_id: None,
846 };
847 Box::pin(chain.execute_operation(
848 context,
849 operation.clone(),
850 &mut txn_tracker,
851 &mut resource_controller,
852 ))
853 .await
854 .with_execution_context(chain_execution_context)?;
855 resource_controller
856 .with_state(&mut chain.system)
857 .await?
858 .track_operation(operation)
859 .with_execution_context(chain_execution_context)?;
860 }
861 }
862
863 let txn_outcome = txn_tracker
864 .into_outcome()
865 .with_execution_context(chain_execution_context)?;
866 next_message_index = txn_outcome.next_message_index;
867 next_application_index = txn_outcome.next_application_index;
868
869 subscribe.extend(txn_outcome.subscribe);
870 unsubscribe.extend(txn_outcome.unsubscribe);
871
872 if matches!(
873 transaction,
874 Transaction::ExecuteOperation(_)
875 | Transaction::ReceiveMessages(IncomingBundle {
876 action: MessageAction::Accept,
877 ..
878 })
879 ) {
880 for message_out in &txn_outcome.outgoing_messages {
881 resource_controller
882 .with_state(&mut chain.system)
883 .await?
884 .track_message(&message_out.message)
885 .with_execution_context(chain_execution_context)?;
886 }
887 }
888
889 resource_controller
890 .track_block_size_of(&(
891 &txn_outcome.oracle_responses,
892 &txn_outcome.outgoing_messages,
893 &txn_outcome.events,
894 &txn_outcome.blobs,
895 ))
896 .with_execution_context(chain_execution_context)?;
897 for blob in &txn_outcome.blobs {
898 if blob.content().blob_type() == BlobType::Data {
899 resource_controller
900 .with_state(&mut chain.system)
901 .await?
902 .track_blob_published(blob.content())
903 .with_execution_context(chain_execution_context)?;
904 }
905 }
906 oracle_responses.push(txn_outcome.oracle_responses);
907 messages.push(txn_outcome.outgoing_messages);
908 events.push(txn_outcome.events);
909 blobs.push(txn_outcome.blobs);
910
911 if let Transaction::ExecuteOperation(_) = transaction {
912 resource_controller
913 .track_block_size_of(&(&txn_outcome.operation_result))
914 .with_execution_context(chain_execution_context)?;
915 operation_results.push(OperationResult(txn_outcome.operation_result));
916 }
917 }
918
919 if !chain.system.closed.get() {
922 resource_controller
923 .with_state(&mut chain.system)
924 .await?
925 .track_block()
926 .with_execution_context(ChainExecutionContext::Block)?;
927 }
928
929 let recipients = messages
930 .iter()
931 .flatten()
932 .flat_map(|message| message.destination.recipient())
933 .collect::<BTreeSet<_>>();
934 let mut previous_message_blocks = BTreeMap::new();
935 for recipient in recipients {
936 if let Some(height) = previous_message_blocks_view.get(&recipient).await? {
937 let hash = confirmed_log
938 .get(usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?)
939 .await?
940 .ok_or_else(|| {
941 ChainError::InternalError("missing entry in confirmed_log".into())
942 })?;
943 previous_message_blocks.insert(recipient, hash);
944 }
945 }
946
947 let txn_count = block.incoming_bundles.len() + block.operations.len();
948 assert_eq!(oracle_responses.len(), txn_count);
949 assert_eq!(messages.len(), txn_count);
950 assert_eq!(events.len(), txn_count);
951 assert_eq!(blobs.len(), txn_count);
952
953 #[cfg(with_metrics)]
954 Self::track_block_metrics(&resource_controller.tracker);
955
956 let state_hash = {
957 #[cfg(with_metrics)]
958 let _hash_latency = STATE_HASH_COMPUTATION_LATENCY.measure_latency();
959 chain.crypto_hash().await?
960 };
961
962 let outcome = BlockExecutionOutcome {
963 messages,
964 previous_message_blocks,
965 state_hash,
966 oracle_responses,
967 events,
968 blobs,
969 operation_results,
970 };
971
972 Ok((outcome, subscribe, unsubscribe))
973 }
974
975 pub async fn execute_block(
978 &mut self,
979 block: &ProposedBlock,
980 local_time: Timestamp,
981 round: Option<u32>,
982 published_blobs: &[Blob],
983 replaying_oracle_responses: Option<Vec<Vec<OracleResponse>>>,
984 ) -> Result<
985 (
986 BlockExecutionOutcome,
987 Vec<(ChannelFullName, ChainId)>,
988 Vec<(ChannelFullName, ChainId)>,
989 ),
990 ChainError,
991 > {
992 Self::execute_block_inner(
993 &mut self.execution_state,
994 &self.confirmed_log,
995 &self.previous_message_blocks,
996 block,
997 local_time,
998 round,
999 published_blobs,
1000 replaying_oracle_responses,
1001 )
1002 .await
1003 }
1004
1005 pub async fn apply_confirmed_block(
1008 &mut self,
1009 block: &ConfirmedBlock,
1010 local_time: Timestamp,
1011 ) -> Result<(), ChainError> {
1012 let hash = block.inner().hash();
1013 let block = block.inner().inner();
1014 self.execution_state_hash.set(Some(block.header.state_hash));
1015 for txn_messages in &block.body.messages {
1016 self.process_outgoing_messages(block.header.height, txn_messages)
1017 .await?;
1018 }
1019
1020 let recipients = block
1021 .body
1022 .messages
1023 .iter()
1024 .flatten()
1025 .flat_map(|message| message.destination.recipient())
1026 .collect::<BTreeSet<_>>();
1027 for recipient in recipients {
1028 self.previous_message_blocks
1029 .insert(&recipient, block.header.height)?;
1030 }
1031 self.reset_chain_manager(block.header.height.try_add_one()?, local_time)?;
1033
1034 let tip = self.tip_state.get_mut();
1036 tip.block_hash = Some(hash);
1037 tip.next_block_height.try_add_assign_one()?;
1038 tip.update_counters(
1039 &block.body.incoming_bundles,
1040 &block.body.operations,
1041 &block.body.messages,
1042 )?;
1043 self.confirmed_log.push(hash);
1044 Ok(())
1045 }
1046
1047 #[expect(clippy::too_many_arguments)]
1049 async fn execute_message_in_block(
1050 chain: &mut ExecutionStateView<C>,
1051 message_id: MessageId,
1052 posted_message: &PostedMessage,
1053 incoming_bundle: &IncomingBundle,
1054 block: &ProposedBlock,
1055 round: Option<u32>,
1056 txn_tracker: &mut TransactionTracker,
1057 resource_controller: &mut ResourceController<Option<AccountOwner>>,
1058 ) -> Result<(), ChainError> {
1059 #[cfg(with_metrics)]
1060 let _message_latency = MESSAGE_EXECUTION_LATENCY.measure_latency();
1061 let context = MessageContext {
1062 chain_id: block.chain_id,
1063 is_bouncing: posted_message.is_bouncing(),
1064 height: block.height,
1065 round,
1066 certificate_hash: incoming_bundle.bundle.certificate_hash,
1067 message_id,
1068 authenticated_signer: posted_message.authenticated_signer,
1069 refund_grant_to: posted_message.refund_grant_to,
1070 };
1071 let mut grant = posted_message.grant;
1072 match incoming_bundle.action {
1073 MessageAction::Accept => {
1074 let chain_execution_context =
1075 ChainExecutionContext::IncomingBundle(txn_tracker.transaction_index());
1076 ensure!(!chain.system.closed.get(), ChainError::ClosedChain);
1078
1079 Box::pin(chain.execute_message(
1080 context,
1081 posted_message.message.clone(),
1082 (grant > Amount::ZERO).then_some(&mut grant),
1083 txn_tracker,
1084 resource_controller,
1085 ))
1086 .await
1087 .with_execution_context(chain_execution_context)?;
1088 chain
1089 .send_refund(context, grant, txn_tracker)
1090 .await
1091 .with_execution_context(chain_execution_context)?;
1092 }
1093 MessageAction::Reject => {
1094 ensure!(
1097 !posted_message.is_protected() || *chain.system.closed.get(),
1098 ChainError::CannotRejectMessage {
1099 chain_id: block.chain_id,
1100 origin: Box::new(incoming_bundle.origin.clone()),
1101 posted_message: Box::new(posted_message.clone()),
1102 }
1103 );
1104 if posted_message.is_tracked() {
1105 chain
1107 .bounce_message(context, grant, posted_message.message.clone(), txn_tracker)
1108 .await
1109 .with_execution_context(ChainExecutionContext::Block)?;
1110 } else {
1111 chain
1113 .send_refund(context, grant, txn_tracker)
1114 .await
1115 .with_execution_context(ChainExecutionContext::Block)?;
1116 }
1117 }
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn is_child(&self) -> bool {
1124 let Some(description) = self.execution_state.system.description.get() else {
1125 return true;
1127 };
1128 description.is_child()
1129 }
1130
1131 fn check_app_permissions(
1133 app_permissions: &ApplicationPermissions,
1134 block: &ProposedBlock,
1135 ) -> Result<(), ChainError> {
1136 let mut mandatory = HashSet::<ApplicationId>::from_iter(
1137 app_permissions.mandatory_applications.iter().cloned(),
1138 );
1139 for operation in &block.operations {
1140 ensure!(
1141 app_permissions.can_execute_operations(&operation.application_id()),
1142 ChainError::AuthorizedApplications(
1143 app_permissions.execute_operations.clone().unwrap()
1144 )
1145 );
1146 if let Operation::User { application_id, .. } = operation {
1147 mandatory.remove(application_id);
1148 }
1149 }
1150 for pending in block.incoming_messages() {
1151 if mandatory.is_empty() {
1152 break;
1153 }
1154 if let Message::User { application_id, .. } = &pending.message {
1155 mandatory.remove(application_id);
1156 }
1157 }
1158 ensure!(
1159 mandatory.is_empty(),
1160 ChainError::MissingMandatoryApplications(mandatory.into_iter().collect())
1161 );
1162 Ok(())
1163 }
1164
1165 fn reset_chain_manager(
1167 &mut self,
1168 next_height: BlockHeight,
1169 local_time: Timestamp,
1170 ) -> Result<(), ChainError> {
1171 let maybe_committee = self.execution_state.system.current_committee().into_iter();
1172 let ownership = self.execution_state.system.ownership.get().clone();
1173 let fallback_owners =
1174 maybe_committee.flat_map(|(_, committee)| committee.account_keys_and_weights());
1175 self.pending_validated_blobs.clear();
1176 self.pending_proposed_blobs.clear();
1177 self.manager
1178 .reset(ownership, next_height, local_time, fallback_owners)
1179 }
1180
1181 #[cfg(with_metrics)]
1183 fn track_block_metrics(tracker: &ResourceTracker) {
1184 NUM_BLOCKS_EXECUTED.with_label_values(&[]).inc();
1185 WASM_FUEL_USED_PER_BLOCK
1186 .with_label_values(&[])
1187 .observe(tracker.fuel as f64);
1188 WASM_NUM_READS_PER_BLOCK
1189 .with_label_values(&[])
1190 .observe(tracker.read_operations as f64);
1191 WASM_BYTES_READ_PER_BLOCK
1192 .with_label_values(&[])
1193 .observe(tracker.bytes_read as f64);
1194 WASM_BYTES_WRITTEN_PER_BLOCK
1195 .with_label_values(&[])
1196 .observe(tracker.bytes_written as f64);
1197 }
1198
1199 async fn process_outgoing_messages(
1200 &mut self,
1201 height: BlockHeight,
1202 messages: &[OutgoingMessage],
1203 ) -> Result<(), ChainError> {
1204 let max_stream_queries = self.context().max_stream_queries();
1205 let mut recipients = HashSet::new();
1208 let mut channel_broadcasts = HashSet::new();
1209 for message in messages {
1210 match &message.destination {
1211 Destination::Recipient(id) => {
1212 recipients.insert(*id);
1213 }
1214 Destination::Subscribers(name) => {
1215 ensure!(
1216 message.grant == Amount::ZERO,
1217 ChainError::GrantUseOnBroadcast
1218 );
1219 let GenericApplicationId::User(application_id) =
1220 message.message.application_id()
1221 else {
1222 return Err(ChainError::InternalError(
1223 "System messages cannot be sent to channels".to_string(),
1224 ));
1225 };
1226 channel_broadcasts.insert(ChannelFullName {
1227 application_id,
1228 name: name.clone(),
1229 });
1230 }
1231 }
1232 }
1233
1234 let outbox_counters = self.outbox_counters.get_mut();
1236 let targets = recipients
1237 .into_iter()
1238 .map(Target::chain)
1239 .collect::<Vec<_>>();
1240 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1241 for mut outbox in outboxes {
1242 if outbox.schedule_message(height)? {
1243 *outbox_counters.entry(height).or_default() += 1;
1244 }
1245 }
1246
1247 let full_names = channel_broadcasts.into_iter().collect::<Vec<_>>();
1248 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1249 let stream = full_names.into_iter().zip(channels);
1250 let stream = stream::iter(stream)
1251 .map(|(full_name, mut channel)| async move {
1252 let recipients = channel.subscribers.indices().await?;
1253 channel.block_heights.push(height);
1254 let targets = recipients
1255 .into_iter()
1256 .map(|recipient| Target::channel(recipient, full_name.clone()))
1257 .collect::<Vec<_>>();
1258 Ok::<_, ChainError>(targets)
1259 })
1260 .buffer_unordered(max_stream_queries);
1261 let infos = stream.try_collect::<Vec<_>>().await?;
1262 let targets = infos.into_iter().flatten().collect::<Vec<_>>();
1263 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1264 let outbox_counters = self.outbox_counters.get_mut();
1265 for mut outbox in outboxes {
1266 if outbox.schedule_message(height)? {
1267 *outbox_counters.entry(height).or_default() += 1;
1268 }
1269 }
1270 #[cfg(with_metrics)]
1271 NUM_OUTBOXES
1272 .with_label_values(&[])
1273 .observe(self.outboxes.count().await? as f64);
1274 Ok(())
1275 }
1276
1277 pub async fn process_subscribes(
1280 &mut self,
1281 names_and_ids: Vec<(ChannelFullName, ChainId)>,
1282 ) -> Result<bool, ChainError> {
1283 if names_and_ids.is_empty() {
1284 return Ok(false);
1285 }
1286 let full_names = names_and_ids
1287 .iter()
1288 .map(|(name, _)| name.clone())
1289 .collect::<Vec<_>>();
1290 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1291 let subscribe_channels = names_and_ids.into_iter().zip(channels);
1292 let max_stream_queries = self.context().max_stream_queries();
1293 let stream = stream::iter(subscribe_channels)
1294 .map(|((name, id), mut channel)| async move {
1295 if channel.subscribers.contains(&id).await? {
1296 return Ok(None); }
1298 tracing::trace!("Adding subscriber {id:.8} for {name:}");
1299 channel.subscribers.insert(&id)?;
1300 let heights = channel.block_heights.read(..).await?;
1302 if heights.is_empty() {
1303 return Ok(None); }
1305 let target = Target::channel(id, name.clone());
1306 Ok::<_, ChainError>(Some((target, heights)))
1307 })
1308 .buffer_unordered(max_stream_queries);
1309 let infos = stream.try_collect::<Vec<_>>().await?;
1310 let (targets, heights): (Vec<_>, Vec<_>) = infos.into_iter().flatten().unzip();
1311 let mut new_outbox_entries = false;
1312 let outboxes = self.outboxes.try_load_entries_mut(&targets).await?;
1313 let outbox_counters = self.outbox_counters.get_mut();
1314 for (heights, mut outbox) in heights.into_iter().zip(outboxes) {
1315 for height in heights {
1316 if outbox.schedule_message(height)? {
1317 *outbox_counters.entry(height).or_default() += 1;
1318 new_outbox_entries = true;
1319 }
1320 }
1321 }
1322 #[cfg(with_metrics)]
1323 NUM_OUTBOXES
1324 .with_label_values(&[])
1325 .observe(self.outboxes.count().await? as f64);
1326 Ok(new_outbox_entries)
1327 }
1328
1329 pub async fn process_unsubscribes(
1330 &mut self,
1331 names_and_ids: Vec<(ChannelFullName, ChainId)>,
1332 ) -> Result<(), ChainError> {
1333 if names_and_ids.is_empty() {
1334 return Ok(());
1335 }
1336 let full_names = names_and_ids
1337 .iter()
1338 .map(|(name, _)| name.clone())
1339 .collect::<Vec<_>>();
1340 let channels = self.channels.try_load_entries_mut(&full_names).await?;
1341 for ((_name, id), mut channel) in names_and_ids.into_iter().zip(channels) {
1342 channel.subscribers.remove(&id)?;
1344 }
1345 Ok(())
1346 }
1347}
1348
1349#[test]
1350fn empty_block_size() {
1351 let size = bcs::serialized_size(&crate::block::Block::new(
1352 crate::test::make_first_block(ChainId::root(0)),
1353 crate::data_types::BlockExecutionOutcome::default(),
1354 ))
1355 .unwrap();
1356 assert_eq!(size, EMPTY_BLOCK_SIZE);
1357}