smoldot_light/runtime_service.rs
1// Smoldot
2// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18//! Background runtime download service.
19//!
20//! This service plugs on top of a [`sync_service`], listens for new best blocks and checks
21//! whether the runtime has changed in any way. Its objective is to always provide an up-to-date
22//! [`executor::host::HostVmPrototype`] ready to be called by other services.
23//!
24//! # Usage
25//!
26//! The runtime service lets user subscribe to block updates, similar to the [`sync_service`].
27//! These subscriptions are implemented by subscribing to the underlying [`sync_service`] and,
28//! for each notification, checking whether the runtime has changed (thanks to the presence or
29//! absence of a header digest item), and downloading the runtime code if necessary. Therefore,
30//! these notifications might come with a delay compared to directly using the [`sync_service`].
31//!
32//! If it isn't possible to download the runtime code of a block (for example because peers refuse
33//! to answer or have already pruned the block) or if the runtime service already has too many
34//! pending downloads, this block is simply not reported on the subscriptions. The download will
35//! be repeatedly tried until it succeeds.
36//!
37//! Consequently, you are strongly encouraged to not use both the [`sync_service`] *and* the
38//! [`RuntimeService`] of the same chain. They each provide a consistent view of the chain, but
39//! this view isn't necessarily the same on both services.
40//!
41//! The main service offered by the runtime service is [`RuntimeService::subscribe_all`], that
42//! notifies about new blocks once their runtime is known.
43//!
44//! # Blocks pinning
45//!
46//! Blocks that are reported through [`RuntimeService::subscribe_all`] are automatically *pinned*.
47//! If multiple subscriptions exist, each block is pinned once per subscription.
48//!
49//! As long as a block is pinned, the [`RuntimeService`] is guaranteed to keep in its internal
50//! state the runtime of this block and its properties.
51//!
52//! Blocks must be manually unpinned by calling [`Subscription::unpin_block`].
53//! Failing to do so is effectively a memory leak. If the number of pinned blocks becomes too
54//! large, the subscription is force-killed by the [`RuntimeService`].
55//!
56
57use crate::{log, network_service, platform::PlatformRef, sync_service};
58
59use alloc::{
60 borrow::{Cow, ToOwned as _},
61 boxed::Box,
62 collections::{BTreeMap, VecDeque},
63 format,
64 string::{String, ToString as _},
65 sync::{Arc, Weak},
66 vec::Vec,
67};
68use async_lock::Mutex;
69use core::{cmp, fmt, iter, mem, num::NonZero, ops, pin::Pin, time::Duration};
70use derive_more::derive;
71use futures_channel::oneshot;
72use futures_lite::FutureExt as _;
73use futures_util::{Stream, StreamExt as _, future, stream};
74use itertools::Itertools as _;
75use rand::seq::IteratorRandom as _;
76use rand_chacha::rand_core::SeedableRng as _;
77use smoldot::{
78 chain::async_tree,
79 executor, header,
80 informant::{BytesDisplay, HashDisplay},
81 trie::{self, Nibble, proof_decode},
82};
83
84/// Configuration for a runtime service.
85pub struct Config<TPlat: PlatformRef> {
86 /// Name of the chain, for logging purposes.
87 ///
88 /// > **Note**: This name will be directly printed out. Any special character should already
89 /// > have been filtered out from this name.
90 pub log_name: String,
91
92 /// Access to the platform's capabilities.
93 pub platform: TPlat,
94
95 /// Service responsible for synchronizing the chain.
96 pub sync_service: Arc<sync_service::SyncService<TPlat>>,
97
98 /// Service responsible for accessing the networking of the chain.
99 pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
100
101 /// Header of the genesis block of the chain, in SCALE encoding.
102 pub genesis_block_scale_encoded_header: Vec<u8>,
103}
104
105/// Runtime currently pinned within a [`RuntimeService`].
106///
107/// Destroying this object automatically unpins the runtime.
108#[derive(Clone)]
109pub struct PinnedRuntime(Arc<Runtime>);
110
111/// See [the module-level documentation](..).
112pub struct RuntimeService<TPlat: PlatformRef> {
113 /// Configuration of the background task. Used to restart the background task if necessary.
114 background_task_config: BackgroundTaskConfig<TPlat>,
115
116 /// Sender to send messages to the background task.
117 to_background: Mutex<async_channel::Sender<ToBackground<TPlat>>>,
118}
119
120impl<TPlat: PlatformRef> RuntimeService<TPlat> {
121 /// Initializes a new runtime service.
122 pub fn new(config: Config<TPlat>) -> Self {
123 // Target to use for all the logs of this service.
124 let log_target = format!("runtime-{}", config.log_name);
125
126 let background_task_config = BackgroundTaskConfig {
127 log_target: log_target.clone(),
128 platform: config.platform.clone(),
129 sync_service: config.sync_service,
130 network_service: config.network_service,
131 genesis_block_scale_encoded_header: config.genesis_block_scale_encoded_header,
132 };
133
134 // Spawns a task that runs in the background and updates the content of the mutex.
135 let to_background;
136 config.platform.spawn_task(log_target.clone().into(), {
137 let (tx, rx) = async_channel::bounded(16);
138 let tx_weak = tx.downgrade();
139 to_background = tx;
140 let background_task_config = background_task_config.clone();
141 run_background(background_task_config, rx, tx_weak)
142 });
143
144 RuntimeService {
145 background_task_config,
146 to_background: Mutex::new(to_background),
147 }
148 }
149
150 /// Calls [`sync_service::SyncService::block_number_bytes`] on the sync service associated to
151 /// this runtime service.
152 pub fn block_number_bytes(&self) -> usize {
153 self.background_task_config
154 .sync_service
155 .block_number_bytes()
156 }
157
158 /// Subscribes to the state of the chain: the current state and the new blocks.
159 ///
160 /// This function only returns once the runtime of the current finalized block is known. This
161 /// might take a long time.
162 ///
163 /// Only up to `buffer_size` block notifications are buffered in the channel. If the channel
164 /// is full when a new notification is attempted to be pushed, the channel gets closed.
165 ///
166 /// A maximum number of finalized or non-canonical (i.e. not part of the finalized chain)
167 /// pinned blocks must be passed, indicating the maximum number of blocks that are finalized
168 /// or non-canonical that the runtime service will pin at the same time for this subscription.
169 /// If this maximum is reached, the channel will get closed. In situations where the subscriber
170 /// is guaranteed to always properly unpin blocks, a value of `usize::MAX` can be
171 /// passed in order to ignore this maximum.
172 ///
173 /// The channel also gets closed if a gap in the finality happens, such as after a Grandpa
174 /// warp syncing.
175 ///
176 /// See [`SubscribeAll`] for information about the return value.
177 pub async fn subscribe_all(
178 &self,
179 buffer_size: usize,
180 max_pinned_blocks: NonZero<usize>,
181 ) -> SubscribeAll<TPlat> {
182 loop {
183 let (result_tx, result_rx) = oneshot::channel();
184 let _ = self
185 .send_message_or_restart_service(ToBackground::SubscribeAll(
186 ToBackgroundSubscribeAll {
187 result_tx,
188 buffer_size,
189 max_pinned_blocks,
190 },
191 ))
192 .await;
193
194 if let Ok(subscribe_all) = result_rx.await {
195 break subscribe_all;
196 }
197 }
198 }
199
200 /// Unpins a block after it has been reported by a subscription.
201 ///
202 /// Has no effect if the [`SubscriptionId`] is not or no longer valid (as the runtime service
203 /// can kill any subscription at any moment).
204 ///
205 /// # Panic
206 ///
207 /// Panics if the block hash has not been reported or has already been unpinned.
208 ///
209 // TODO: add #[track_caller] once possible, see https://github.com/rust-lang/rust/issues/87417
210 pub async fn unpin_block(&self, subscription_id: SubscriptionId, block_hash: [u8; 32]) {
211 let (result_tx, result_rx) = oneshot::channel();
212 let _ = self
213 .to_background
214 .lock()
215 .await
216 .send(ToBackground::UnpinBlock {
217 result_tx,
218 subscription_id,
219 block_hash,
220 })
221 .await;
222 match result_rx.await {
223 Ok(Ok(())) => {
224 // Background task has indicated success.
225 }
226 Err(_) => {
227 // Background task has crashed. Subscription is stale. Function has no effect.
228 }
229 Ok(Err(_)) => {
230 // Background task has indicated that the block has already been unpinned.
231 panic!()
232 }
233 }
234 }
235
236 /// Returns the storage value and Merkle value of the `:code` key of the finalized block.
237 ///
238 /// Returns `None` if the runtime of the current finalized block is not known yet.
239 // TODO: this function has a bad API but is hopefully temporary
240 pub async fn finalized_runtime_storage_merkle_values(
241 &self,
242 ) -> Option<(Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<Nibble>>)> {
243 let (result_tx, result_rx) = oneshot::channel();
244
245 let _ = self
246 .to_background
247 .lock()
248 .await
249 .send(ToBackground::FinalizedRuntimeStorageMerkleValues { result_tx })
250 .await;
251
252 result_rx.await.unwrap_or(None)
253 }
254
255 /// Pins the runtime of a pinned block.
256 ///
257 /// The hash of the block passed as parameter corresponds to the block whose runtime is to
258 /// be pinned. The block must be currently pinned in the context of the provided
259 /// [`SubscriptionId`].
260 ///
261 /// Returns the pinned runtime, plus the state trie root hash and height of the block.
262 ///
263 /// Returns an error if the subscription is stale, meaning that it has been reset by the
264 /// runtime service.
265 pub async fn pin_pinned_block_runtime(
266 &self,
267 subscription_id: SubscriptionId,
268 block_hash: [u8; 32],
269 ) -> Result<(PinnedRuntime, [u8; 32], u64), PinPinnedBlockRuntimeError> {
270 let (result_tx, result_rx) = oneshot::channel();
271
272 let _ = self
273 .to_background
274 .lock()
275 .await
276 .send(ToBackground::PinPinnedBlockRuntime {
277 result_tx,
278 subscription_id,
279 block_hash,
280 })
281 .await;
282
283 match result_rx.await {
284 Ok(result) => result.map(|(r, v, n)| (PinnedRuntime(r), v, n)),
285 Err(_) => {
286 // Background service has crashed. This means that the subscription is obsolete.
287 Err(PinPinnedBlockRuntimeError::ObsoleteSubscription)
288 }
289 }
290 }
291
292 /// Performs a runtime call.
293 ///
294 /// The hash of the block passed as parameter corresponds to the block whose runtime to use
295 /// to make the call. The block must be currently pinned in the context of the provided
296 /// [`SubscriptionId`].
297 ///
298 /// Returns an error if the subscription is stale, meaning that it has been reset by the
299 /// runtime service.
300 pub async fn runtime_call(
301 &self,
302 pinned_runtime: PinnedRuntime,
303 block_hash: [u8; 32],
304 block_number: u64,
305 block_state_trie_root_hash: [u8; 32],
306 function_name: String,
307 required_api_version: Option<(String, ops::RangeInclusive<u32>)>,
308 parameters_vectored: Vec<u8>,
309 total_attempts: u32,
310 timeout_per_request: Duration,
311 max_parallel: NonZero<u32>,
312 ) -> Result<RuntimeCallSuccess, RuntimeCallError> {
313 let (result_tx, result_rx) = oneshot::channel();
314
315 self.send_message_or_restart_service(ToBackground::RuntimeCall {
316 result_tx,
317 pinned_runtime: pinned_runtime.0,
318 block_hash,
319 block_number,
320 block_state_trie_root_hash,
321 function_name,
322 required_api_version,
323 parameters_vectored,
324 total_attempts,
325 timeout_per_request,
326 _max_parallel: max_parallel,
327 })
328 .await;
329
330 match result_rx.await {
331 Ok(result) => result,
332 Err(_) => {
333 // Background service has crashed.
334 Err(RuntimeCallError::Crash)
335 }
336 }
337 }
338
339 /// Tries to find a runtime within the [`RuntimeService`] that has the given storage code and
340 /// heap pages. If none is found, compiles the runtime and stores it within the
341 /// [`RuntimeService`].
342 pub async fn compile_and_pin_runtime(
343 &self,
344 storage_code: Option<Vec<u8>>,
345 storage_heap_pages: Option<Vec<u8>>,
346 code_merkle_value: Option<Vec<u8>>,
347 closest_ancestor_excluding: Option<Vec<Nibble>>,
348 ) -> Result<PinnedRuntime, CompileAndPinRuntimeError> {
349 let (result_tx, result_rx) = oneshot::channel();
350
351 let _ = self
352 .send_message_or_restart_service(ToBackground::CompileAndPinRuntime {
353 result_tx,
354 storage_code,
355 storage_heap_pages,
356 code_merkle_value,
357 closest_ancestor_excluding,
358 })
359 .await;
360
361 Ok(PinnedRuntime(
362 result_rx
363 .await
364 .map_err(|_| CompileAndPinRuntimeError::Crash)?,
365 ))
366 }
367
368 /// Returns the runtime specification of the given runtime.
369 pub async fn pinned_runtime_specification(
370 &self,
371 pinned_runtime: PinnedRuntime,
372 ) -> Result<executor::CoreVersion, PinnedRuntimeSpecificationError> {
373 match &pinned_runtime.0.runtime {
374 Ok(rt) => Ok(rt.runtime_version().clone()),
375 Err(error) => Err(PinnedRuntimeSpecificationError::InvalidRuntime(
376 error.clone(),
377 )),
378 }
379 }
380
381 /// Returns true if it is believed that we are near the head of the chain.
382 ///
383 /// The way this method is implemented is opaque and cannot be relied on. The return value
384 /// should only ever be shown to the user and not used for any meaningful logic.
385 pub async fn is_near_head_of_chain_heuristic(&self) -> bool {
386 let (result_tx, result_rx) = oneshot::channel();
387 let _ = self
388 .to_background
389 .lock()
390 .await
391 .send(ToBackground::IsNearHeadOfChainHeuristic { result_tx })
392 .await;
393 result_rx.await.unwrap_or(false)
394 }
395
396 /// Sends a message to the background task. Restarts the background task if it has crashed.
397 async fn send_message_or_restart_service(&self, message: ToBackground<TPlat>) {
398 let mut lock = self.to_background.lock().await;
399
400 if lock.is_closed() {
401 let (tx, rx) = async_channel::bounded(16);
402 let tx_weak = tx.downgrade();
403 *lock = tx;
404
405 self.background_task_config.platform.spawn_task(
406 self.background_task_config.log_target.clone().into(),
407 {
408 let background_task_config = self.background_task_config.clone();
409 let platform = background_task_config.platform.clone();
410 async move {
411 // Sleep for a bit in order to avoid infinite loops of repeated crashes.
412 background_task_config
413 .platform
414 .sleep(Duration::from_secs(2))
415 .await;
416 let log_target = background_task_config.log_target.clone();
417 log!(&platform, Debug, &log_target, "restart");
418 run_background(background_task_config, rx, tx_weak).await;
419 log!(&platform, Debug, &log_target, "shutdown");
420 }
421 },
422 );
423 }
424
425 // Note that the background task might have crashed again at this point already, and thus
426 // errors are not impossible.
427 let _ = lock.send(message).await;
428 }
429}
430
431/// Return value of [`RuntimeService::subscribe_all`].
432pub struct SubscribeAll<TPlat: PlatformRef> {
433 /// SCALE-encoded header of the finalized block at the time of the subscription.
434 pub finalized_block_scale_encoded_header: Vec<u8>,
435
436 /// If the runtime of the finalized block is known, contains the information about it.
437 pub finalized_block_runtime: Result<executor::CoreVersion, RuntimeError>,
438
439 /// List of all known non-finalized blocks at the time of subscription.
440 ///
441 /// Only one element in this list has [`BlockNotification::is_new_best`] equal to true.
442 ///
443 /// The blocks are guaranteed to be ordered so that parents are always found before their
444 /// children.
445 pub non_finalized_blocks_ancestry_order: Vec<BlockNotification>,
446
447 /// Channel onto which new blocks are sent. The channel gets closed if it is full when a new
448 /// block needs to be reported.
449 pub new_blocks: Subscription<TPlat>,
450}
451
452#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
453pub struct SubscriptionId(u64);
454
455impl fmt::Debug for SubscriptionId {
456 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
457 fmt::Debug::fmt(&self.0, f)
458 }
459}
460
461pub struct Subscription<TPlat: PlatformRef> {
462 subscription_id: u64,
463 channel: Pin<Box<async_channel::Receiver<Notification>>>,
464 to_background: async_channel::Sender<ToBackground<TPlat>>,
465}
466
467impl<TPlat: PlatformRef> Subscription<TPlat> {
468 pub async fn next(&mut self) -> Option<Notification> {
469 self.channel.next().await
470 }
471
472 /// Returns an opaque identifier that can be used to call [`RuntimeService::unpin_block`].
473 pub fn id(&self) -> SubscriptionId {
474 SubscriptionId(self.subscription_id)
475 }
476
477 /// Unpins a block after it has been reported.
478 ///
479 /// # Panic
480 ///
481 /// Panics if the block hash has not been reported or has already been unpinned.
482 ///
483 pub async fn unpin_block(&self, block_hash: [u8; 32]) {
484 let (result_tx, result_rx) = oneshot::channel();
485 let _ = self
486 .to_background
487 .send(ToBackground::UnpinBlock {
488 result_tx,
489 subscription_id: SubscriptionId(self.subscription_id),
490 block_hash,
491 })
492 .await;
493 result_rx.await.unwrap().unwrap()
494 }
495}
496
497/// Notification about a new block or a new finalized block.
498///
499/// See [`RuntimeService::subscribe_all`].
500#[derive(Debug, Clone)]
501pub enum Notification {
502 /// A non-finalized block has been finalized.
503 Finalized {
504 /// BLAKE2 hash of the header of the block that has been finalized.
505 ///
506 /// A block with this hash is guaranteed to have earlier been reported in a
507 /// [`BlockNotification`], either in [`SubscribeAll::non_finalized_blocks_ancestry_order`]
508 /// or in a [`Notification::Block`].
509 ///
510 /// It is also guaranteed that this block is a child of the previously-finalized block. In
511 /// other words, if multiple blocks are finalized at the same time, only one
512 /// [`Notification::Finalized`] is generated and contains the highest finalized block.
513 ///
514 /// If it is not possible for the [`RuntimeService`] to avoid a gap in the list of
515 /// finalized blocks, then the [`SubscribeAll::new_blocks`] channel is force-closed.
516 hash: [u8; 32],
517
518 /// If the current best block is pruned by the finalization, contains the updated hash
519 /// of the best block after the finalization.
520 ///
521 /// If the newly-finalized block is an ancestor of the current best block, then this field
522 /// contains the hash of this current best block. Otherwise, the best block is now
523 /// the non-finalized block with the given hash.
524 ///
525 /// A block with this hash is guaranteed to have earlier been reported in a
526 /// [`BlockNotification`], either in [`SubscribeAll::non_finalized_blocks_ancestry_order`]
527 /// or in a [`Notification::Block`].
528 best_block_hash_if_changed: Option<[u8; 32]>,
529
530 /// List of BLAKE2 hashes of the headers of the blocks that have been discarded because
531 /// they're not descendants of the newly-finalized block.
532 ///
533 /// This list contains all the siblings of the newly-finalized block and all their
534 /// descendants.
535 pruned_blocks: Vec<[u8; 32]>,
536 },
537
538 /// A new block has been added to the list of unfinalized blocks.
539 Block(BlockNotification),
540
541 /// The best block has changed to a different one.
542 BestBlockChanged {
543 /// Hash of the new best block.
544 ///
545 /// This can be either the hash of the latest finalized block or the hash of a
546 /// non-finalized block.
547 hash: [u8; 32],
548 },
549}
550
551/// Notification about a new block.
552///
553/// See [`RuntimeService::subscribe_all`].
554#[derive(Debug, Clone)]
555pub struct BlockNotification {
556 /// True if this block is considered as the best block of the chain.
557 pub is_new_best: bool,
558
559 /// SCALE-encoded header of the block.
560 pub scale_encoded_header: Vec<u8>,
561
562 /// BLAKE2 hash of the header of the parent of this block.
563 ///
564 ///
565 /// A block with this hash is guaranteed to have earlier been reported in a
566 /// [`BlockNotification`], either in [`SubscribeAll::non_finalized_blocks_ancestry_order`] or
567 /// in a [`Notification::Block`].
568 ///
569 /// > **Note**: The header of a block contains the hash of its parent. When it comes to
570 /// > consensus algorithms such as Babe or Aura, the syncing code verifies that this
571 /// > hash, stored in the header, actually corresponds to a valid block. However,
572 /// > when it comes to parachain consensus, no such verification is performed.
573 /// > Contrary to the hash stored in the header, the value of this field is
574 /// > guaranteed to refer to a block that is known by the syncing service. This
575 /// > allows a subscriber of the state of the chain to precisely track the hierarchy
576 /// > of blocks, without risking to run into a problem in case of a block with an
577 /// > invalid header.
578 pub parent_hash: [u8; 32],
579
580 /// If the runtime of the block is different from its parent, contains the information about
581 /// the new runtime.
582 pub new_runtime: Option<Result<executor::CoreVersion, RuntimeError>>,
583}
584
585/// Successful runtime call.
586#[derive(Debug)]
587pub struct RuntimeCallSuccess {
588 /// Output of the runtime call.
589 pub output: Vec<u8>,
590
591 /// Version of the API that was found. `Some` if and only if an API requirement was passed.
592 pub api_version: Option<u32>,
593}
594
595/// See [`RuntimeService::pin_pinned_block_runtime`].
596#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
597pub enum PinPinnedBlockRuntimeError {
598 /// Subscription is dead.
599 ObsoleteSubscription,
600
601 /// Requested block isn't pinned by the subscription.
602 BlockNotPinned,
603}
604
605/// See [`RuntimeService::pinned_runtime_specification`].
606#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
607pub enum PinnedRuntimeSpecificationError {
608 /// The runtime is invalid.
609 InvalidRuntime(RuntimeError),
610}
611
612/// See [`RuntimeService::runtime_call`].
613#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
614pub enum RuntimeCallError {
615 /// The runtime of the requested block is invalid.
616 InvalidRuntime(RuntimeError),
617
618 /// API version required for the call isn't fulfilled.
619 ApiVersionRequirementUnfulfilled,
620
621 /// Runtime service has crashed while the call was in progress.
622 ///
623 /// This doesn't necessarily indicate that the call was responsible for this crash.
624 Crash,
625
626 /// Error during the execution of the runtime.
627 ///
628 /// There is no point in trying the same call again, as it would result in the same error.
629 #[display("Error during the execution of the runtime: {_0}")]
630 Execution(RuntimeCallExecutionError),
631
632 /// Error trying to access the storage required for the runtime call.
633 ///
634 /// Because these errors are non-fatal, the operation is attempted multiple times, and as such
635 /// there can be multiple errors.
636 ///
637 /// Trying the same call again might succeed.
638 #[display("Error trying to access the storage required for the runtime call")]
639 // TODO: better display?
640 Inaccessible(#[error(not(source))] Vec<RuntimeCallInaccessibleError>),
641}
642
643/// See [`RuntimeCallError::Execution`].
644#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
645pub enum RuntimeCallExecutionError {
646 /// Failed to initialize the virtual machine.
647 Start(executor::host::StartErr),
648 /// Error during the execution of the virtual machine.
649 Execution(executor::runtime_call::ErrorDetail),
650 /// Virtual machine has called a host function that it is not allowed to call.
651 ForbiddenHostFunction,
652}
653
654/// See [`RuntimeCallError::Inaccessible`].
655#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
656pub enum RuntimeCallInaccessibleError {
657 /// Failed to download the call proof from the network.
658 Network(network_service::CallProofRequestError),
659 /// Call proof downloaded from the network has an invalid format.
660 InvalidCallProof(proof_decode::Error),
661 /// One or more entries are missing from the downloaded call proof.
662 MissingProofEntry,
663}
664
665/// Error when analyzing the runtime.
666#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
667pub enum RuntimeError {
668 /// The `:code` key of the storage is empty.
669 CodeNotFound,
670 /// Error while parsing the `:heappages` storage value.
671 #[display("Failed to parse `:heappages` storage value: {_0}")]
672 InvalidHeapPages(executor::InvalidHeapPagesError),
673 /// Error while compiling the runtime.
674 #[display("{_0}")]
675 Build(executor::host::NewErr),
676}
677
678/// Error potentially returned by [`RuntimeService::compile_and_pin_runtime`].
679#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
680pub enum CompileAndPinRuntimeError {
681 /// Background service has crashed while compiling this runtime. The crash might however not
682 /// necessarily be caused by the runtime compilation.
683 Crash,
684}
685
686/// Message towards the background task.
687enum ToBackground<TPlat: PlatformRef> {
688 SubscribeAll(ToBackgroundSubscribeAll<TPlat>),
689 CompileAndPinRuntime {
690 result_tx: oneshot::Sender<Arc<Runtime>>,
691 storage_code: Option<Vec<u8>>,
692 storage_heap_pages: Option<Vec<u8>>,
693 code_merkle_value: Option<Vec<u8>>,
694 closest_ancestor_excluding: Option<Vec<Nibble>>,
695 },
696 FinalizedRuntimeStorageMerkleValues {
697 // TODO: overcomplicated
698 result_tx: oneshot::Sender<Option<(Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<Nibble>>)>>,
699 },
700 IsNearHeadOfChainHeuristic {
701 result_tx: oneshot::Sender<bool>,
702 },
703 UnpinBlock {
704 result_tx: oneshot::Sender<Result<(), ()>>,
705 subscription_id: SubscriptionId,
706 block_hash: [u8; 32],
707 },
708 PinPinnedBlockRuntime {
709 result_tx:
710 oneshot::Sender<Result<(Arc<Runtime>, [u8; 32], u64), PinPinnedBlockRuntimeError>>,
711 subscription_id: SubscriptionId,
712 block_hash: [u8; 32],
713 },
714 RuntimeCall {
715 result_tx: oneshot::Sender<Result<RuntimeCallSuccess, RuntimeCallError>>,
716 pinned_runtime: Arc<Runtime>,
717 block_hash: [u8; 32],
718 block_number: u64,
719 block_state_trie_root_hash: [u8; 32],
720 function_name: String,
721 required_api_version: Option<(String, ops::RangeInclusive<u32>)>,
722 parameters_vectored: Vec<u8>,
723 total_attempts: u32,
724 timeout_per_request: Duration,
725 _max_parallel: NonZero<u32>,
726 },
727}
728
729struct ToBackgroundSubscribeAll<TPlat: PlatformRef> {
730 result_tx: oneshot::Sender<SubscribeAll<TPlat>>,
731 buffer_size: usize,
732 max_pinned_blocks: NonZero<usize>,
733}
734
735#[derive(Clone)]
736struct PinnedBlock {
737 /// Reference-counted runtime of the pinned block.
738 runtime: Arc<Runtime>,
739
740 /// Hash of the trie root of the pinned block.
741 state_trie_root_hash: [u8; 32],
742
743 /// Height of the pinned block.
744 block_number: u64,
745
746 /// `true` if the block is non-finalized and part of the canonical chain.
747 /// If `true`, then the block doesn't count towards the maximum number of pinned blocks of
748 /// the subscription.
749 block_ignores_limit: bool,
750}
751
752#[derive(Clone)]
753struct Block {
754 /// Hash of the block in question. Redundant with `header`, but the hash is so often needed
755 /// that it makes sense to cache it.
756 hash: [u8; 32],
757
758 /// Height of the block.
759 height: u64,
760
761 /// Header of the block in question.
762 /// Guaranteed to always be valid for the output best and finalized blocks. Otherwise,
763 /// not guaranteed to be valid.
764 scale_encoded_header: Vec<u8>,
765}
766
767#[derive(Clone)]
768struct BackgroundTaskConfig<TPlat: PlatformRef> {
769 log_target: String,
770 platform: TPlat,
771 sync_service: Arc<sync_service::SyncService<TPlat>>,
772 network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
773 genesis_block_scale_encoded_header: Vec<u8>,
774}
775
776async fn run_background<TPlat: PlatformRef>(
777 config: BackgroundTaskConfig<TPlat>,
778 to_background: async_channel::Receiver<ToBackground<TPlat>>,
779 to_background_tx: async_channel::WeakSender<ToBackground<TPlat>>,
780) {
781 log!(
782 &config.platform,
783 Trace,
784 &config.log_target,
785 "start",
786 genesis_block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
787 &config.genesis_block_scale_encoded_header
788 ))
789 );
790
791 // State machine containing all the state that will be manipulated below.
792 let mut background = {
793 let tree = {
794 let mut tree = async_tree::AsyncTree::new(async_tree::Config {
795 finalized_async_user_data: None,
796 retry_after_failed: Duration::from_secs(10),
797 blocks_capacity: 32,
798 });
799 let node_index = tree.input_insert_block(
800 Block {
801 hash: header::hash_from_scale_encoded_header(
802 &config.genesis_block_scale_encoded_header,
803 ),
804 height: 0,
805 scale_encoded_header: config.genesis_block_scale_encoded_header,
806 },
807 None,
808 false,
809 true,
810 );
811 tree.input_finalize(node_index);
812
813 Tree::FinalizedBlockRuntimeUnknown { tree }
814 };
815
816 Background {
817 log_target: config.log_target.clone(),
818 platform: config.platform.clone(),
819 sync_service: config.sync_service.clone(),
820 network_service: config.network_service.clone(),
821 to_background: Box::pin(to_background.clone()),
822 to_background_tx: to_background_tx.clone(),
823 next_subscription_id: 0,
824 tree,
825 runtimes: slab::Slab::with_capacity(2),
826 pending_subscriptions: VecDeque::with_capacity(8),
827 blocks_stream: None,
828 runtime_downloads: stream::FuturesUnordered::new(),
829 progress_runtime_call_requests: stream::FuturesUnordered::new(),
830 }
831 };
832
833 // Inner loop. Process incoming events.
834 loop {
835 // Yield at every loop in order to provide better tasks granularity.
836 futures_lite::future::yield_now().await;
837
838 enum WakeUpReason<TPlat: PlatformRef> {
839 MustSubscribe,
840 StartDownload(async_tree::AsyncOpId, async_tree::NodeIndex),
841 TreeAdvanceFinalizedKnown(async_tree::OutputUpdate<Block, Arc<Runtime>>),
842 TreeAdvanceFinalizedUnknown(async_tree::OutputUpdate<Block, Option<Arc<Runtime>>>),
843 StartPendingSubscribeAll(ToBackgroundSubscribeAll<TPlat>),
844 Notification(sync_service::Notification),
845 SyncServiceSubscriptionReset,
846 ToBackground(ToBackground<TPlat>),
847 ForegroundClosed,
848 RuntimeDownloadFinished(
849 async_tree::AsyncOpId,
850 Result<
851 (
852 Option<Vec<u8>>,
853 Option<Vec<u8>>,
854 Option<Vec<u8>>,
855 Option<Vec<Nibble>>,
856 ),
857 RuntimeDownloadError,
858 >,
859 ),
860 ProgressRuntimeCallRequest(ProgressRuntimeCallRequest),
861 }
862
863 // Wait for something to happen or for some processing to be necessary.
864 let wake_up_reason: WakeUpReason<_> = {
865 let finalized_block_known =
866 matches!(background.tree, Tree::FinalizedBlockRuntimeKnown { .. });
867 let num_runtime_downloads = background.runtime_downloads.len();
868 let any_subscription = match &background.tree {
869 Tree::FinalizedBlockRuntimeKnown {
870 all_blocks_subscriptions,
871 ..
872 } => !all_blocks_subscriptions.is_empty(),
873 Tree::FinalizedBlockRuntimeUnknown { .. } => false,
874 };
875 let any_pending_subscription = !background.pending_subscriptions.is_empty();
876 async {
877 if finalized_block_known {
878 if let Some(pending_subscription) = background.pending_subscriptions.pop_front()
879 {
880 WakeUpReason::StartPendingSubscribeAll(pending_subscription)
881 } else {
882 future::pending().await
883 }
884 } else {
885 future::pending().await
886 }
887 }
888 .or(async {
889 if let Some(blocks_stream) = background.blocks_stream.as_mut() {
890 if !any_subscription && !any_pending_subscription {
891 WakeUpReason::SyncServiceSubscriptionReset
892 } else {
893 blocks_stream.next().await.map_or(
894 WakeUpReason::SyncServiceSubscriptionReset,
895 WakeUpReason::Notification,
896 )
897 }
898 } else if any_subscription || any_pending_subscription {
899 // Only start subscribing to the sync service if there is any pending
900 // or active runtime service subscription.
901 // Note that subscriptions to the runtime service aren't destroyed when the
902 // sync service subscriptions is lost but when the sync service is
903 // resubscribed.
904 WakeUpReason::MustSubscribe
905 } else {
906 future::pending().await
907 }
908 })
909 .or(async {
910 background
911 .to_background
912 .next()
913 .await
914 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::ToBackground)
915 })
916 .or(async {
917 if !background.runtime_downloads.is_empty() {
918 let (async_op_id, download_result) =
919 background.runtime_downloads.select_next_some().await;
920 WakeUpReason::RuntimeDownloadFinished(async_op_id, download_result)
921 } else {
922 future::pending().await
923 }
924 })
925 .or(async {
926 if !background.progress_runtime_call_requests.is_empty() {
927 let result = background
928 .progress_runtime_call_requests
929 .select_next_some()
930 .await;
931 WakeUpReason::ProgressRuntimeCallRequest(result)
932 } else {
933 future::pending().await
934 }
935 })
936 .or(async {
937 loop {
938 // There might be a new runtime download to start.
939 // Don't download more than 2 runtimes at a time.
940 let wait = if num_runtime_downloads < 2 {
941 // Grab what to download. If there's nothing more to download, do nothing.
942 let async_op = match &mut background.tree {
943 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
944 tree.next_necessary_async_op(&background.platform.now())
945 }
946 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
947 tree.next_necessary_async_op(&background.platform.now())
948 }
949 };
950
951 match async_op {
952 async_tree::NextNecessaryAsyncOp::Ready(dl) => {
953 break WakeUpReason::StartDownload(dl.id, dl.block_index);
954 }
955 async_tree::NextNecessaryAsyncOp::NotReady { when } => {
956 if let Some(when) = when {
957 either::Left(background.platform.sleep_until(when))
958 } else {
959 either::Right(future::pending())
960 }
961 }
962 }
963 } else {
964 either::Right(future::pending())
965 };
966
967 match &mut background.tree {
968 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
969 match tree.try_advance_output() {
970 Some(update) => {
971 break WakeUpReason::TreeAdvanceFinalizedKnown(update);
972 }
973 None => wait.await,
974 }
975 }
976 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
977 match tree.try_advance_output() {
978 Some(update) => {
979 break WakeUpReason::TreeAdvanceFinalizedUnknown(update);
980 }
981 None => wait.await,
982 }
983 }
984 }
985 }
986 })
987 .await
988 };
989
990 match wake_up_reason {
991 WakeUpReason::StartDownload(download_id, block_index) => {
992 let block = match &mut background.tree {
993 Tree::FinalizedBlockRuntimeKnown { tree, .. } => &tree[block_index],
994 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => &tree[block_index],
995 };
996
997 log!(
998 &background.platform,
999 Debug,
1000 &background.log_target,
1001 "block-runtime-download-start",
1002 block_hash = HashDisplay(&block.hash)
1003 );
1004
1005 // Dispatches a runtime download task to `runtime_downloads`.
1006 background.runtime_downloads.push(Box::pin({
1007 let future = download_runtime(
1008 background.sync_service.clone(),
1009 block.hash,
1010 &block.scale_encoded_header,
1011 );
1012
1013 async move { (download_id, future.await) }
1014 }));
1015 }
1016
1017 WakeUpReason::TreeAdvanceFinalizedKnown(async_tree::OutputUpdate::Finalized {
1018 user_data: new_finalized,
1019 best_output_block_updated,
1020 pruned_blocks,
1021 former_finalized_async_op_user_data: former_finalized_runtime,
1022 ..
1023 }) => {
1024 let Tree::FinalizedBlockRuntimeKnown {
1025 tree,
1026 finalized_block,
1027 all_blocks_subscriptions,
1028 pinned_blocks,
1029 } = &mut background.tree
1030 else {
1031 unreachable!()
1032 };
1033
1034 *finalized_block = new_finalized;
1035 let best_block_hash_if_changed = if best_output_block_updated {
1036 Some(
1037 tree.output_best_block_index()
1038 .map_or(finalized_block.hash, |(idx, _)| tree[idx].hash),
1039 )
1040 } else {
1041 None
1042 };
1043
1044 log!(
1045 &background.platform,
1046 Trace,
1047 &background.log_target,
1048 "output-chain-finalized",
1049 block_hash = HashDisplay(&finalized_block.hash),
1050 best_block_hash = if let Some(best_block_hash) = best_block_hash_if_changed {
1051 Cow::Owned(HashDisplay(&best_block_hash).to_string())
1052 } else {
1053 Cow::Borrowed("<unchanged>")
1054 },
1055 num_subscribers = all_blocks_subscriptions.len()
1056 );
1057
1058 // The finalization might cause some runtimes in the list of runtimes
1059 // to have become unused. Clean them up.
1060 drop(former_finalized_runtime);
1061 background
1062 .runtimes
1063 .retain(|_, runtime| runtime.strong_count() > 0);
1064
1065 let all_blocks_notif = Notification::Finalized {
1066 best_block_hash_if_changed,
1067 hash: finalized_block.hash,
1068 pruned_blocks: pruned_blocks.iter().map(|(_, b, _)| b.hash).collect(),
1069 };
1070
1071 let mut to_remove = Vec::new();
1072 for (subscription_id, (sender, finalized_pinned_remaining)) in
1073 all_blocks_subscriptions.iter_mut()
1074 {
1075 let count_limit = pruned_blocks.len() + 1;
1076
1077 if *finalized_pinned_remaining < count_limit {
1078 to_remove.push(*subscription_id);
1079 continue;
1080 }
1081
1082 if sender.try_send(all_blocks_notif.clone()).is_err() {
1083 to_remove.push(*subscription_id);
1084 continue;
1085 }
1086
1087 *finalized_pinned_remaining -= count_limit;
1088
1089 // Mark the finalized and pruned blocks as finalized or non-canonical.
1090 for block in iter::once(&finalized_block.hash)
1091 .chain(pruned_blocks.iter().map(|(_, b, _)| &b.hash))
1092 {
1093 if let Some(pin) = pinned_blocks.get_mut(&(*subscription_id, *block)) {
1094 debug_assert!(pin.block_ignores_limit);
1095 pin.block_ignores_limit = false;
1096 }
1097 }
1098 }
1099 for to_remove in to_remove {
1100 all_blocks_subscriptions.remove(&to_remove);
1101 let pinned_blocks_to_remove = pinned_blocks
1102 .range((to_remove, [0; 32])..=(to_remove, [0xff; 32]))
1103 .map(|((_, h), _)| *h)
1104 .collect::<Vec<_>>();
1105 for block in pinned_blocks_to_remove {
1106 pinned_blocks.remove(&(to_remove, block));
1107 }
1108 }
1109 }
1110
1111 WakeUpReason::TreeAdvanceFinalizedKnown(async_tree::OutputUpdate::Block(block)) => {
1112 let Tree::FinalizedBlockRuntimeKnown {
1113 tree,
1114 finalized_block,
1115 all_blocks_subscriptions,
1116 pinned_blocks,
1117 } = &mut background.tree
1118 else {
1119 unreachable!()
1120 };
1121
1122 let block_index = block.index;
1123 let block_runtime = tree.block_async_user_data(block_index).unwrap().clone();
1124 let block_hash = tree[block_index].hash;
1125 let scale_encoded_header = tree[block_index].scale_encoded_header.clone();
1126 let is_new_best = block.is_new_best;
1127
1128 let (block_number, state_trie_root_hash) = {
1129 let decoded = header::decode(
1130 &scale_encoded_header,
1131 background.sync_service.block_number_bytes(),
1132 )
1133 .unwrap();
1134 (decoded.number, *decoded.state_root)
1135 };
1136
1137 let parent_runtime = tree
1138 .parent(block_index)
1139 .map_or(tree.output_finalized_async_user_data().clone(), |idx| {
1140 tree.block_async_user_data(idx).unwrap().clone()
1141 });
1142
1143 log!(
1144 &background.platform,
1145 Trace,
1146 &background.log_target,
1147 "output-chain-new-block",
1148 block_hash = HashDisplay(&tree[block_index].hash),
1149 is_new_best,
1150 num_subscribers = all_blocks_subscriptions.len()
1151 );
1152
1153 let notif = Notification::Block(BlockNotification {
1154 parent_hash: tree
1155 .parent(block_index)
1156 .map_or(finalized_block.hash, |idx| tree[idx].hash),
1157 is_new_best,
1158 scale_encoded_header,
1159 new_runtime: if !Arc::ptr_eq(&parent_runtime, &block_runtime) {
1160 Some(
1161 block_runtime
1162 .runtime
1163 .as_ref()
1164 .map(|rt| rt.runtime_version().clone())
1165 .map_err(|err| err.clone()),
1166 )
1167 } else {
1168 None
1169 },
1170 });
1171
1172 let mut to_remove = Vec::new();
1173 for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() {
1174 if sender.try_send(notif.clone()).is_ok() {
1175 let _prev_value = pinned_blocks.insert(
1176 (*subscription_id, block_hash),
1177 PinnedBlock {
1178 runtime: block_runtime.clone(),
1179 state_trie_root_hash,
1180 block_number,
1181 block_ignores_limit: true,
1182 },
1183 );
1184 debug_assert!(_prev_value.is_none());
1185 } else {
1186 to_remove.push(*subscription_id);
1187 }
1188 }
1189 for to_remove in to_remove {
1190 all_blocks_subscriptions.remove(&to_remove);
1191 let pinned_blocks_to_remove = pinned_blocks
1192 .range((to_remove, [0; 32])..=(to_remove, [0xff; 32]))
1193 .map(|((_, h), _)| *h)
1194 .collect::<Vec<_>>();
1195 for block in pinned_blocks_to_remove {
1196 pinned_blocks.remove(&(to_remove, block));
1197 }
1198 }
1199 }
1200
1201 WakeUpReason::TreeAdvanceFinalizedKnown(
1202 async_tree::OutputUpdate::BestBlockChanged { best_block_index },
1203 ) => {
1204 let Tree::FinalizedBlockRuntimeKnown {
1205 tree,
1206 finalized_block,
1207 all_blocks_subscriptions,
1208 pinned_blocks,
1209 } = &mut background.tree
1210 else {
1211 unreachable!()
1212 };
1213
1214 let hash = best_block_index
1215 .map_or(&*finalized_block, |idx| &tree[idx])
1216 .hash;
1217
1218 log!(
1219 &background.platform,
1220 Trace,
1221 &background.log_target,
1222 "output-chain-best-block-update",
1223 block_hash = HashDisplay(&hash),
1224 num_subscribers = all_blocks_subscriptions.len()
1225 );
1226
1227 let notif = Notification::BestBlockChanged { hash };
1228
1229 let mut to_remove = Vec::new();
1230 for (subscription_id, (sender, _)) in all_blocks_subscriptions.iter_mut() {
1231 if sender.try_send(notif.clone()).is_err() {
1232 to_remove.push(*subscription_id);
1233 }
1234 }
1235 for to_remove in to_remove {
1236 all_blocks_subscriptions.remove(&to_remove);
1237 let pinned_blocks_to_remove = pinned_blocks
1238 .range((to_remove, [0; 32])..=(to_remove, [0xff; 32]))
1239 .map(|((_, h), _)| *h)
1240 .collect::<Vec<_>>();
1241 for block in pinned_blocks_to_remove {
1242 pinned_blocks.remove(&(to_remove, block));
1243 }
1244 }
1245 }
1246
1247 WakeUpReason::TreeAdvanceFinalizedUnknown(async_tree::OutputUpdate::Block(_))
1248 | WakeUpReason::TreeAdvanceFinalizedUnknown(
1249 async_tree::OutputUpdate::BestBlockChanged { .. },
1250 ) => {
1251 // Nothing to do.
1252 continue;
1253 }
1254
1255 WakeUpReason::TreeAdvanceFinalizedUnknown(async_tree::OutputUpdate::Finalized {
1256 user_data: new_finalized,
1257 former_finalized_async_op_user_data,
1258 best_output_block_updated,
1259 ..
1260 }) => {
1261 let Tree::FinalizedBlockRuntimeUnknown { tree, .. } = &mut background.tree else {
1262 unreachable!()
1263 };
1264
1265 // Make sure that this is the first finalized block whose runtime is
1266 // known, otherwise there's a pretty big bug somewhere.
1267 debug_assert!(former_finalized_async_op_user_data.is_none());
1268
1269 let best_block_hash_if_changed = if best_output_block_updated {
1270 Some(
1271 tree.output_best_block_index()
1272 .map_or(new_finalized.hash, |(idx, _)| tree[idx].hash),
1273 )
1274 } else {
1275 None
1276 };
1277 log!(
1278 &background.platform,
1279 Trace,
1280 &background.log_target,
1281 "output-chain-initialized",
1282 finalized_block_hash = HashDisplay(&new_finalized.hash),
1283 best_block_hash = if let Some(best_block_hash) = best_block_hash_if_changed {
1284 Cow::Owned(HashDisplay(&best_block_hash).to_string())
1285 } else {
1286 Cow::Borrowed("<unchanged>")
1287 },
1288 );
1289
1290 // Substitute `tree` with a dummy empty tree just in order to extract
1291 // the value. The `tree` only contains "async op user datas" equal
1292 // to `Some` (they're inserted manually when a download finishes)
1293 // except for the finalized block which has now just been extracted.
1294 // We can safely unwrap() all these user datas.
1295 let new_tree = mem::replace(
1296 tree,
1297 async_tree::AsyncTree::new(async_tree::Config {
1298 finalized_async_user_data: None,
1299 retry_after_failed: Duration::new(0, 0),
1300 blocks_capacity: 0,
1301 }),
1302 )
1303 .map_async_op_user_data(|runtime_index| runtime_index.unwrap());
1304
1305 // Change the state of `Background` to the "finalized runtime known" state.
1306 background.tree = Tree::FinalizedBlockRuntimeKnown {
1307 all_blocks_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
1308 32,
1309 Default::default(),
1310 ), // TODO: capacity?
1311 pinned_blocks: BTreeMap::new(),
1312 tree: new_tree,
1313 finalized_block: new_finalized,
1314 };
1315 }
1316
1317 WakeUpReason::MustSubscribe => {
1318 // Subscription to the sync service must be recreated.
1319
1320 // The buffer size should be large enough so that, if the CPU is busy, it
1321 // doesn't become full before the execution of the runtime service resumes.
1322 // Note that this `await` freezes the entire runtime service background task,
1323 // but the sync service guarantees that `subscribe_all` returns very quickly.
1324 let subscription = background.sync_service.subscribe_all(32, true).await;
1325
1326 log!(
1327 &background.platform,
1328 Trace,
1329 &background.log_target,
1330 "sync-service-subscribed",
1331 finalized_block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
1332 &subscription.finalized_block_scale_encoded_header
1333 )),
1334 finalized_block_runtime_known = ?subscription.finalized_block_runtime.is_some()
1335 );
1336
1337 // Update the state of `Background` with what we just grabbed.
1338 //
1339 // Note that the content of `Background` is reset unconditionally.
1340 // It might seem like a good idea to only reset the content of `Background` if the new
1341 // subscription has a different finalized block than currently. However, there is
1342 // absolutely no guarantee for the non-finalized blocks currently in the tree to be a
1343 // subset or superset of the non-finalized blocks in the new subscription.
1344 // Using the new subscription but keeping the existing tree could therefore result in
1345 // state inconsistencies.
1346 //
1347 // Additionally, the situation where a subscription is killed but the finalized block
1348 // didn't change should be extremely rare anyway.
1349 {
1350 background.runtimes = slab::Slab::with_capacity(2); // TODO: hardcoded capacity
1351
1352 // TODO: DRY below
1353 if let Some(finalized_block_runtime) = subscription.finalized_block_runtime {
1354 let finalized_block_hash = header::hash_from_scale_encoded_header(
1355 &subscription.finalized_block_scale_encoded_header,
1356 );
1357 let finalized_block_height = header::decode(
1358 &subscription.finalized_block_scale_encoded_header,
1359 background.sync_service.block_number_bytes(),
1360 )
1361 .unwrap()
1362 .number; // TODO: consider feeding the information from the sync service?
1363
1364 let storage_code_len = u64::try_from(
1365 finalized_block_runtime
1366 .storage_code
1367 .as_ref()
1368 .map_or(0, |v| v.len()),
1369 )
1370 .unwrap();
1371
1372 let runtime = Arc::new(Runtime {
1373 runtime_code: finalized_block_runtime.storage_code,
1374 heap_pages: finalized_block_runtime.storage_heap_pages,
1375 code_merkle_value: finalized_block_runtime.code_merkle_value,
1376 closest_ancestor_excluding: finalized_block_runtime
1377 .closest_ancestor_excluding,
1378 runtime: Ok(finalized_block_runtime.virtual_machine),
1379 });
1380
1381 match &runtime.runtime {
1382 Ok(runtime) => {
1383 log!(
1384 &background.platform,
1385 Info,
1386 &background.log_target,
1387 format!(
1388 "Finalized block runtime ready. Spec version: {}. \
1389 Size of `:code`: {}.",
1390 runtime.runtime_version().decode().spec_version,
1391 BytesDisplay(storage_code_len)
1392 )
1393 );
1394 }
1395 Err(error) => {
1396 log!(
1397 &background.platform,
1398 Warn,
1399 &background.log_target,
1400 format!(
1401 "Erronenous finalized block runtime. Size of \
1402 `:code`: {}.\nError: {}\nThis indicates an incompatibility \
1403 between smoldot and the chain.",
1404 BytesDisplay(storage_code_len),
1405 error
1406 )
1407 );
1408 }
1409 }
1410
1411 background.tree = Tree::FinalizedBlockRuntimeKnown {
1412 all_blocks_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
1413 32,
1414 Default::default(),
1415 ), // TODO: capacity?
1416 pinned_blocks: BTreeMap::new(),
1417 finalized_block: Block {
1418 hash: finalized_block_hash,
1419 height: finalized_block_height,
1420 scale_encoded_header: subscription
1421 .finalized_block_scale_encoded_header,
1422 },
1423 tree: {
1424 let mut tree =
1425 async_tree::AsyncTree::<_, Block, _>::new(async_tree::Config {
1426 finalized_async_user_data: runtime,
1427 retry_after_failed: Duration::from_secs(10), // TODO: hardcoded
1428 blocks_capacity: 32,
1429 });
1430
1431 for block in subscription.non_finalized_blocks_ancestry_order {
1432 let parent_index = if block.parent_hash == finalized_block_hash
1433 {
1434 None
1435 } else {
1436 Some(
1437 tree.input_output_iter_unordered()
1438 .find(|b| b.user_data.hash == block.parent_hash)
1439 .unwrap()
1440 .id,
1441 )
1442 };
1443
1444 let same_runtime_as_parent = same_runtime_as_parent(
1445 &block.scale_encoded_header,
1446 background.sync_service.block_number_bytes(),
1447 );
1448 let _ = tree.input_insert_block(
1449 Block {
1450 hash: header::hash_from_scale_encoded_header(
1451 &block.scale_encoded_header,
1452 ),
1453 height: header::decode(
1454 &block.scale_encoded_header,
1455 background.sync_service.block_number_bytes(),
1456 )
1457 .unwrap()
1458 .number, // TODO: consider feeding the information from the sync service?
1459 scale_encoded_header: block.scale_encoded_header,
1460 },
1461 parent_index,
1462 same_runtime_as_parent,
1463 block.is_new_best,
1464 );
1465 }
1466
1467 tree
1468 },
1469 };
1470 } else {
1471 background.tree = Tree::FinalizedBlockRuntimeUnknown {
1472 tree: {
1473 let mut tree = async_tree::AsyncTree::new(async_tree::Config {
1474 finalized_async_user_data: None,
1475 retry_after_failed: Duration::from_secs(10), // TODO: hardcoded
1476 blocks_capacity: 32,
1477 });
1478 let node_index = tree.input_insert_block(
1479 Block {
1480 hash: header::hash_from_scale_encoded_header(
1481 &subscription.finalized_block_scale_encoded_header,
1482 ),
1483 height: header::decode(
1484 &subscription.finalized_block_scale_encoded_header,
1485 background.sync_service.block_number_bytes(),
1486 )
1487 .unwrap()
1488 .number, // TODO: consider feeding the information from the sync service?
1489 scale_encoded_header: subscription
1490 .finalized_block_scale_encoded_header,
1491 },
1492 None,
1493 false,
1494 true,
1495 );
1496 tree.input_finalize(node_index);
1497
1498 for block in subscription.non_finalized_blocks_ancestry_order {
1499 // TODO: O(n)
1500 let parent_index = tree
1501 .input_output_iter_unordered()
1502 .find(|b| b.user_data.hash == block.parent_hash)
1503 .unwrap()
1504 .id;
1505
1506 let same_runtime_as_parent = same_runtime_as_parent(
1507 &block.scale_encoded_header,
1508 background.sync_service.block_number_bytes(),
1509 );
1510 let _ = tree.input_insert_block(
1511 Block {
1512 hash: header::hash_from_scale_encoded_header(
1513 &block.scale_encoded_header,
1514 ),
1515 height: header::decode(
1516 &block.scale_encoded_header,
1517 background.sync_service.block_number_bytes(),
1518 )
1519 .unwrap()
1520 .number, // TODO: consider feeding the information from the sync service?
1521 scale_encoded_header: block.scale_encoded_header,
1522 },
1523 Some(parent_index),
1524 same_runtime_as_parent,
1525 block.is_new_best,
1526 );
1527 }
1528
1529 tree
1530 },
1531 };
1532 }
1533 }
1534
1535 background.blocks_stream = Some(Box::pin(subscription.new_blocks));
1536 background.runtime_downloads = stream::FuturesUnordered::new();
1537 }
1538
1539 WakeUpReason::StartPendingSubscribeAll(pending_subscription) => {
1540 // A subscription is waiting to be started.
1541
1542 // Extract the components of the `FinalizedBlockRuntimeKnown`.
1543 let (tree, finalized_block, pinned_blocks, all_blocks_subscriptions) =
1544 match &mut background.tree {
1545 Tree::FinalizedBlockRuntimeKnown {
1546 tree,
1547 finalized_block,
1548 pinned_blocks,
1549 all_blocks_subscriptions,
1550 } => (
1551 tree,
1552 finalized_block,
1553 pinned_blocks,
1554 all_blocks_subscriptions,
1555 ),
1556 _ => unreachable!(),
1557 };
1558
1559 let (tx, new_blocks_channel) =
1560 async_channel::bounded(pending_subscription.buffer_size);
1561 let subscription_id = background.next_subscription_id;
1562 debug_assert_eq!(
1563 pinned_blocks
1564 .range((subscription_id, [0; 32])..=(subscription_id, [0xff; 32]))
1565 .count(),
1566 0
1567 );
1568 background.next_subscription_id += 1;
1569
1570 log!(
1571 &background.platform,
1572 Trace,
1573 &background.log_target,
1574 "pending-runtime-service-subscriptions-process",
1575 subscription_id
1576 );
1577
1578 let decoded_finalized_block = header::decode(
1579 &finalized_block.scale_encoded_header,
1580 background.sync_service.block_number_bytes(),
1581 )
1582 .unwrap();
1583
1584 let _prev_value = pinned_blocks.insert(
1585 (subscription_id, finalized_block.hash),
1586 PinnedBlock {
1587 runtime: tree.output_finalized_async_user_data().clone(),
1588 state_trie_root_hash: *decoded_finalized_block.state_root,
1589 block_number: decoded_finalized_block.number,
1590 block_ignores_limit: false,
1591 },
1592 );
1593 debug_assert!(_prev_value.is_none());
1594
1595 let mut non_finalized_blocks_ancestry_order =
1596 Vec::with_capacity(tree.num_input_non_finalized_blocks());
1597 for block in tree.input_output_iter_ancestry_order() {
1598 let runtime = match block.async_op_user_data {
1599 Some(rt) => rt.clone(),
1600 None => continue, // Runtime of that block not known yet, so it shouldn't be reported.
1601 };
1602
1603 let block_hash = block.user_data.hash;
1604 let parent_runtime = tree.parent(block.id).map_or(
1605 tree.output_finalized_async_user_data().clone(),
1606 |parent_idx| tree.block_async_user_data(parent_idx).unwrap().clone(),
1607 );
1608
1609 let parent_hash = *header::decode(
1610 &block.user_data.scale_encoded_header,
1611 background.sync_service.block_number_bytes(),
1612 )
1613 .unwrap()
1614 .parent_hash; // TODO: correct? if yes, document
1615 debug_assert!(
1616 parent_hash == finalized_block.hash
1617 || tree
1618 .input_output_iter_ancestry_order()
1619 .any(|b| parent_hash == b.user_data.hash
1620 && b.async_op_user_data.is_some())
1621 );
1622
1623 let decoded_header = header::decode(
1624 &block.user_data.scale_encoded_header,
1625 background.sync_service.block_number_bytes(),
1626 )
1627 .unwrap();
1628
1629 let _prev_value = pinned_blocks.insert(
1630 (subscription_id, block_hash),
1631 PinnedBlock {
1632 runtime: runtime.clone(),
1633 state_trie_root_hash: *decoded_header.state_root,
1634 block_number: decoded_header.number,
1635 block_ignores_limit: true,
1636 },
1637 );
1638 debug_assert!(_prev_value.is_none());
1639
1640 non_finalized_blocks_ancestry_order.push(BlockNotification {
1641 is_new_best: block.is_output_best,
1642 parent_hash,
1643 scale_encoded_header: block.user_data.scale_encoded_header.clone(),
1644 new_runtime: if !Arc::ptr_eq(&runtime, &parent_runtime) {
1645 Some(
1646 runtime
1647 .runtime
1648 .as_ref()
1649 .map(|rt| rt.runtime_version().clone())
1650 .map_err(|err| err.clone()),
1651 )
1652 } else {
1653 None
1654 },
1655 });
1656 }
1657
1658 debug_assert!(matches!(
1659 non_finalized_blocks_ancestry_order
1660 .iter()
1661 .filter(|b| b.is_new_best)
1662 .count(),
1663 0 | 1
1664 ));
1665
1666 all_blocks_subscriptions.insert(
1667 subscription_id,
1668 (tx, pending_subscription.max_pinned_blocks.get() - 1),
1669 );
1670
1671 let _ = pending_subscription.result_tx.send(SubscribeAll {
1672 finalized_block_scale_encoded_header: finalized_block
1673 .scale_encoded_header
1674 .clone(),
1675 finalized_block_runtime: tree
1676 .output_finalized_async_user_data()
1677 .runtime
1678 .as_ref()
1679 .map(|rt| rt.runtime_version().clone())
1680 .map_err(|err| err.clone()),
1681 non_finalized_blocks_ancestry_order,
1682 new_blocks: Subscription {
1683 subscription_id,
1684 channel: Box::pin(new_blocks_channel),
1685 to_background: background.to_background_tx.upgrade().unwrap(),
1686 },
1687 });
1688 }
1689
1690 WakeUpReason::SyncServiceSubscriptionReset => {
1691 // The sync service subscription has been or must be reset.
1692 log!(
1693 &background.platform,
1694 Trace,
1695 &background.log_target,
1696 "sync-subscription-reset"
1697 );
1698 background.blocks_stream = None;
1699 }
1700
1701 WakeUpReason::ForegroundClosed => {
1702 // Frontend and all subscriptions have shut down.
1703 log!(
1704 &background.platform,
1705 Debug,
1706 &background.log_target,
1707 "graceful-shutdown"
1708 );
1709 return;
1710 }
1711
1712 WakeUpReason::ToBackground(ToBackground::SubscribeAll(msg)) => {
1713 // Foreground wants to subscribe.
1714
1715 log!(
1716 &background.platform,
1717 Trace,
1718 &background.log_target,
1719 "runtime-service-subscription-requested"
1720 );
1721
1722 // In order to avoid potentially growing `pending_subscriptions` forever, we
1723 // remove senders that are closed. This is `O(n)`, but we expect this list to
1724 // be rather small.
1725 background
1726 .pending_subscriptions
1727 .retain(|s| !s.result_tx.is_canceled());
1728 background.pending_subscriptions.push_back(msg);
1729 }
1730
1731 WakeUpReason::ToBackground(ToBackground::CompileAndPinRuntime {
1732 result_tx,
1733 storage_code,
1734 storage_heap_pages,
1735 code_merkle_value,
1736 closest_ancestor_excluding,
1737 }) => {
1738 // Foreground wants to compile the given runtime.
1739
1740 // Try to find an existing identical runtime.
1741 let existing_runtime = background
1742 .runtimes
1743 .iter()
1744 .filter_map(|(_, rt)| rt.upgrade())
1745 .find(|rt| {
1746 rt.runtime_code == storage_code && rt.heap_pages == storage_heap_pages
1747 });
1748
1749 let runtime = if let Some(existing_runtime) = existing_runtime {
1750 log!(
1751 &background.platform,
1752 Trace,
1753 &background.log_target,
1754 "foreground-compile-and-pin-runtime-cache-hit"
1755 );
1756 existing_runtime
1757 } else {
1758 // No identical runtime was found. Try compiling the new runtime.
1759 let before_compilation = background.platform.now();
1760 let runtime = compile_runtime(
1761 &background.platform,
1762 &background.log_target,
1763 &storage_code,
1764 &storage_heap_pages,
1765 );
1766 let compilation_duration = background.platform.now() - before_compilation;
1767 log!(
1768 &background.platform,
1769 Debug,
1770 &background.log_target,
1771 "foreground-compile-and-pin-runtime-cache-miss",
1772 ?compilation_duration,
1773 compilation_success = runtime.is_ok()
1774 );
1775 let runtime = Arc::new(Runtime {
1776 heap_pages: storage_heap_pages,
1777 runtime_code: storage_code,
1778 code_merkle_value,
1779 closest_ancestor_excluding,
1780 runtime,
1781 });
1782 background.runtimes.insert(Arc::downgrade(&runtime));
1783 runtime
1784 };
1785
1786 let _ = result_tx.send(runtime);
1787 }
1788
1789 WakeUpReason::ToBackground(ToBackground::FinalizedRuntimeStorageMerkleValues {
1790 result_tx,
1791 }) => {
1792 // Foreground wants the finalized runtime storage Merkle values.
1793
1794 log!(
1795 &background.platform,
1796 Trace,
1797 &background.log_target,
1798 "foreground-finalized-runtime-storage-merkle-values"
1799 );
1800
1801 let _ = result_tx.send(
1802 if let Tree::FinalizedBlockRuntimeKnown { tree, .. } = &background.tree {
1803 let runtime = &tree.output_finalized_async_user_data();
1804 Some((
1805 runtime.runtime_code.clone(),
1806 runtime.code_merkle_value.clone(),
1807 runtime.closest_ancestor_excluding.clone(),
1808 ))
1809 } else {
1810 None
1811 },
1812 );
1813 }
1814
1815 WakeUpReason::ToBackground(ToBackground::IsNearHeadOfChainHeuristic { result_tx }) => {
1816 // Foreground wants to query whether we are at the head of the chain.
1817
1818 log!(
1819 &background.platform,
1820 Trace,
1821 &background.log_target,
1822 "foreground-is-near-head-of-chain-heuristic"
1823 );
1824
1825 // If we aren't subscribed to the sync service yet, we notify that we are not
1826 // near the head of the chain.
1827 if background.blocks_stream.is_none() {
1828 let _ = result_tx.send(false);
1829 continue;
1830 }
1831
1832 // Check whether any runtime has been downloaded yet. If not, we notify that
1833 // we're not near the head of the chain.
1834 let Tree::FinalizedBlockRuntimeKnown {
1835 tree,
1836 finalized_block,
1837 ..
1838 } = &background.tree
1839 else {
1840 let _ = result_tx.send(false);
1841 continue;
1842 };
1843
1844 // The runtime service head might be close to the sync service head, but if the
1845 // sync service is far away from the head of the chain, then the runtime service
1846 // is necessarily also far away.
1847 if !background
1848 .sync_service
1849 .is_near_head_of_chain_heuristic()
1850 .await
1851 {
1852 let _ = result_tx.send(false);
1853 continue;
1854 }
1855
1856 // If the input best block (i.e. what the sync service feeds us) is equal to
1857 // output finalized block (i.e. what the runtime service has downloaded), we are
1858 // at the very head of the chain.
1859 let Some(input_best) = tree.input_best_block_index() else {
1860 let _ = result_tx.send(true);
1861 continue;
1862 };
1863
1864 // We consider ourselves as being at the head of the chain if the
1865 // distance between the output tree best (i.e. whose runtime has
1866 // been downloaded) and the input tree best (i.e. what the sync service
1867 // feeds us) is smaller than a certain number of blocks.
1868 // Note that the input best can have a smaller block height than the
1869 // output, for example in case of reorg.
1870 let is_near = tree[input_best].height.saturating_sub(
1871 tree.output_best_block_index()
1872 .map_or(finalized_block.height, |(idx, _)| tree[idx].height),
1873 ) <= 12;
1874 let _ = result_tx.send(is_near);
1875 }
1876
1877 WakeUpReason::ToBackground(ToBackground::UnpinBlock {
1878 result_tx,
1879 subscription_id,
1880 block_hash,
1881 }) => {
1882 // Foreground wants a block unpinned.
1883
1884 log!(
1885 &background.platform,
1886 Trace,
1887 &background.log_target,
1888 "foreground-unpin-block",
1889 subscription_id = subscription_id.0,
1890 block_hash = HashDisplay(&block_hash)
1891 );
1892
1893 if let Tree::FinalizedBlockRuntimeKnown {
1894 all_blocks_subscriptions,
1895 pinned_blocks,
1896 ..
1897 } = &mut background.tree
1898 {
1899 let block_ignores_limit = match pinned_blocks
1900 .remove(&(subscription_id.0, block_hash))
1901 {
1902 Some(b) => b.block_ignores_limit,
1903 None => {
1904 // Cold path.Ã’
1905 if let Some((_, _)) = all_blocks_subscriptions.get(&subscription_id.0) {
1906 let _ = result_tx.send(Err(()));
1907 } else {
1908 let _ = result_tx.send(Ok(()));
1909 }
1910 continue;
1911 }
1912 };
1913
1914 background.runtimes.retain(|_, rt| rt.strong_count() > 0);
1915
1916 if !block_ignores_limit {
1917 let (_, finalized_pinned_remaining) = all_blocks_subscriptions
1918 .get_mut(&subscription_id.0)
1919 .unwrap();
1920 *finalized_pinned_remaining += 1;
1921 }
1922 }
1923
1924 let _ = result_tx.send(Ok(()));
1925 }
1926
1927 WakeUpReason::ToBackground(ToBackground::PinPinnedBlockRuntime {
1928 result_tx,
1929 subscription_id,
1930 block_hash,
1931 }) => {
1932 // Foreground wants to pin the runtime of a pinned block.
1933
1934 log!(
1935 &background.platform,
1936 Trace,
1937 &background.log_target,
1938 "foreground-pin-pinned-block-runtime",
1939 subscription_id = subscription_id.0,
1940 block_hash = HashDisplay(&block_hash)
1941 );
1942
1943 let pinned_block = {
1944 if let Tree::FinalizedBlockRuntimeKnown {
1945 all_blocks_subscriptions,
1946 pinned_blocks,
1947 ..
1948 } = &mut background.tree
1949 {
1950 match pinned_blocks.get(&(subscription_id.0, block_hash)) {
1951 Some(v) => v.clone(),
1952 None => {
1953 // Cold path.
1954 if let Some((_, _)) =
1955 all_blocks_subscriptions.get(&subscription_id.0)
1956 {
1957 let _ = result_tx
1958 .send(Err(PinPinnedBlockRuntimeError::BlockNotPinned));
1959 } else {
1960 let _ = result_tx.send(Err(
1961 PinPinnedBlockRuntimeError::ObsoleteSubscription,
1962 ));
1963 }
1964 continue;
1965 }
1966 }
1967 } else {
1968 let _ =
1969 result_tx.send(Err(PinPinnedBlockRuntimeError::ObsoleteSubscription));
1970 continue;
1971 }
1972 };
1973
1974 let _ = result_tx.send(Ok((
1975 pinned_block.runtime.clone(),
1976 pinned_block.state_trie_root_hash,
1977 pinned_block.block_number,
1978 )));
1979 }
1980
1981 WakeUpReason::ToBackground(ToBackground::RuntimeCall {
1982 result_tx,
1983 pinned_runtime,
1984 block_hash,
1985 block_number,
1986 block_state_trie_root_hash,
1987 function_name,
1988 required_api_version,
1989 parameters_vectored,
1990 total_attempts,
1991 timeout_per_request,
1992 _max_parallel: _, // TODO: unused /!\
1993 }) => {
1994 // Foreground wants to perform a runtime call.
1995
1996 log!(
1997 &background.platform,
1998 Debug,
1999 &background.log_target,
2000 "foreground-runtime-call-start",
2001 block_hash = HashDisplay(&block_hash),
2002 block_number,
2003 block_state_trie_root_hash = HashDisplay(&block_state_trie_root_hash),
2004 function_name,
2005 ?required_api_version,
2006 parameters_vectored = HashDisplay(¶meters_vectored),
2007 total_attempts,
2008 ?timeout_per_request
2009 );
2010
2011 let runtime = match &pinned_runtime.runtime {
2012 Ok(rt) => rt.clone(),
2013 Err(error) => {
2014 // The runtime call can't succeed because smoldot was incapable of
2015 // compiling the runtime.
2016 log!(
2017 &background.platform,
2018 Trace,
2019 &background.log_target,
2020 "foreground-runtime-call-abort",
2021 block_hash = HashDisplay(&block_hash),
2022 error = "invalid-runtime"
2023 );
2024 let _ =
2025 result_tx.send(Err(RuntimeCallError::InvalidRuntime(error.clone())));
2026 continue;
2027 }
2028 };
2029
2030 let api_version =
2031 if let Some((api_name, api_version_required)) = required_api_version {
2032 let api_version_if_fulfilled = runtime
2033 .runtime_version()
2034 .decode()
2035 .apis
2036 .find_version(&api_name)
2037 .filter(|api_version| api_version_required.contains(api_version));
2038
2039 let Some(api_version) = api_version_if_fulfilled else {
2040 // API version required by caller isn't fulfilled.
2041 log!(
2042 &background.platform,
2043 Trace,
2044 &background.log_target,
2045 "foreground-runtime-call-abort",
2046 block_hash = HashDisplay(&block_hash),
2047 error = "api-version-requirement-unfulfilled"
2048 );
2049 let _ = result_tx
2050 .send(Err(RuntimeCallError::ApiVersionRequirementUnfulfilled));
2051 continue;
2052 };
2053
2054 Some(api_version)
2055 } else {
2056 None
2057 };
2058
2059 background
2060 .progress_runtime_call_requests
2061 .push(Box::pin(async move {
2062 ProgressRuntimeCallRequest::Initialize(RuntimeCallRequest {
2063 block_hash,
2064 block_number,
2065 block_state_trie_root_hash,
2066 function_name,
2067 api_version,
2068 parameters_vectored,
2069 runtime,
2070 total_attempts,
2071 timeout_per_request,
2072 inaccessible_errors: Vec::with_capacity(cmp::min(
2073 16,
2074 usize::try_from(total_attempts).unwrap_or(usize::MAX),
2075 )),
2076 result_tx,
2077 })
2078 }));
2079 }
2080
2081 WakeUpReason::ProgressRuntimeCallRequest(progress) => {
2082 let (mut operation, call_proof_and_sender) = match progress {
2083 ProgressRuntimeCallRequest::Initialize(operation) => (operation, None),
2084 ProgressRuntimeCallRequest::CallProofRequestDone {
2085 result: Ok(proof),
2086 call_proof_sender,
2087 operation,
2088 } => (operation, Some((proof, call_proof_sender))),
2089 ProgressRuntimeCallRequest::CallProofRequestDone {
2090 result: Err(error),
2091 mut operation,
2092 call_proof_sender,
2093 } => {
2094 log!(
2095 &background.platform,
2096 Trace,
2097 &background.log_target,
2098 "foreground-runtime-call-progress-fail",
2099 block_hash = HashDisplay(&operation.block_hash),
2100 function_name = operation.function_name,
2101 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2102 remaining_attempts = usize::try_from(operation.total_attempts).unwrap()
2103 - operation.inaccessible_errors.len()
2104 - 1,
2105 ?error
2106 );
2107 operation
2108 .inaccessible_errors
2109 .push(RuntimeCallInaccessibleError::Network(error));
2110 background
2111 .network_service
2112 .ban_and_disconnect(
2113 call_proof_sender,
2114 network_service::BanSeverity::Low,
2115 "call-proof-request-failed",
2116 )
2117 .await;
2118 (operation, None)
2119 }
2120 };
2121
2122 // If the foreground is no longer interested in the result, abort now in order to
2123 // save resources.
2124 if operation.result_tx.is_canceled() {
2125 continue;
2126 }
2127
2128 // Process the call proof.
2129 if let Some((call_proof, call_proof_sender)) = call_proof_and_sender {
2130 match runtime_call_single_attempt(
2131 &background.platform,
2132 operation.runtime.clone(),
2133 &operation.function_name,
2134 &operation.parameters_vectored,
2135 &operation.block_state_trie_root_hash,
2136 call_proof.decode(),
2137 )
2138 .await
2139 {
2140 (timing, Ok(output)) => {
2141 // Execution finished successfully.
2142 // This is the happy path.
2143 log!(
2144 &background.platform,
2145 Debug,
2146 &background.log_target,
2147 "foreground-runtime-call-success",
2148 block_hash = HashDisplay(&operation.block_hash),
2149 function_name = operation.function_name,
2150 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2151 output = HashDisplay(&output),
2152 virtual_machine_call_duration = ?timing.virtual_machine_call_duration,
2153 proof_access_duration = ?timing.proof_access_duration,
2154 );
2155 let _ = operation.result_tx.send(Ok(RuntimeCallSuccess {
2156 output,
2157 api_version: operation.api_version,
2158 }));
2159 continue;
2160 }
2161 (timing, Err(SingleRuntimeCallAttemptError::Execution(error))) => {
2162 log!(
2163 &background.platform,
2164 Debug,
2165 &background.log_target,
2166 "foreground-runtime-call-fail",
2167 block_hash = HashDisplay(&operation.block_hash),
2168 function_name = operation.function_name,
2169 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2170 ?error,
2171 virtual_machine_call_duration = ?timing.virtual_machine_call_duration,
2172 proof_access_duration = ?timing.proof_access_duration,
2173 );
2174 let _ = operation
2175 .result_tx
2176 .send(Err(RuntimeCallError::Execution(error)));
2177 continue;
2178 }
2179 (timing, Err(SingleRuntimeCallAttemptError::Inaccessible(error))) => {
2180 // This path is reached only if the call proof was invalid.
2181 log!(
2182 &background.platform,
2183 Debug,
2184 &background.log_target,
2185 "foreground-runtime-call-progress-invalid-call-proof",
2186 block_hash = HashDisplay(&operation.block_hash),
2187 function_name = operation.function_name,
2188 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2189 remaining_attempts = usize::try_from(operation.total_attempts)
2190 .unwrap()
2191 - operation.inaccessible_errors.len()
2192 - 1,
2193 ?error,
2194 virtual_machine_call_duration = ?timing.virtual_machine_call_duration,
2195 proof_access_duration = ?timing.proof_access_duration,
2196 );
2197 operation.inaccessible_errors.push(error);
2198 background
2199 .network_service
2200 .ban_and_disconnect(
2201 call_proof_sender,
2202 network_service::BanSeverity::High,
2203 "invalid-call-proof",
2204 )
2205 .await;
2206 }
2207 }
2208 }
2209
2210 // If we have failed to obtain a valid proof several times, abort the runtime
2211 // call attempt altogether.
2212 if u32::try_from(operation.inaccessible_errors.len()).unwrap_or(u32::MAX)
2213 >= operation.total_attempts
2214 {
2215 // No log line is printed here because one is already printed earlier.
2216 let _ = operation.result_tx.send(Err(RuntimeCallError::Inaccessible(
2217 operation.inaccessible_errors,
2218 )));
2219 continue;
2220 }
2221
2222 // This can be reached if the call proof was invalid or absent. We must start a
2223 // new call proof request.
2224
2225 // Choose peer to query.
2226 // TODO: better peer selection
2227 // TODO: can there be a race condition where the sync service forgets that a peer has knowledge of a block? shouldn't we somehow cache the peers that know this block ahead of time or something?
2228 let Some(call_proof_target) = background
2229 .sync_service
2230 .peers_assumed_know_blocks(operation.block_number, &operation.block_hash)
2231 .await
2232 .choose(&mut rand_chacha::ChaCha20Rng::from_seed({
2233 // TODO: hacky
2234 let mut seed = [0; 32];
2235 background.platform.fill_random_bytes(&mut seed);
2236 seed
2237 }))
2238 else {
2239 // No peer knows this block. Returning with a failure.
2240 log!(
2241 &background.platform,
2242 Debug,
2243 &background.log_target,
2244 "foreground-runtime-call-request-fail",
2245 block_hash = HashDisplay(&operation.block_hash),
2246 function_name = operation.function_name,
2247 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2248 error = "no-peer-for-call-request"
2249 );
2250 let _ = operation.result_tx.send(Err(RuntimeCallError::Inaccessible(
2251 operation.inaccessible_errors,
2252 )));
2253 continue;
2254 };
2255
2256 log!(
2257 &background.platform,
2258 Trace,
2259 &background.log_target,
2260 "foreground-runtime-call-request-start",
2261 block_hash = HashDisplay(&operation.block_hash),
2262 function_name = operation.function_name,
2263 parameters_vectored = HashDisplay(&operation.parameters_vectored),
2264 call_proof_target,
2265 );
2266
2267 // Start the request.
2268 background.progress_runtime_call_requests.push(Box::pin({
2269 let call_proof_request_future =
2270 background.network_service.clone().call_proof_request(
2271 call_proof_target.clone(),
2272 network_service::CallProofRequestConfig {
2273 block_hash: operation.block_hash,
2274 method: Cow::Owned(operation.function_name.clone()), // TODO: overhead
2275 parameter_vectored: iter::once(
2276 operation.parameters_vectored.clone(),
2277 ), // TODO: overhead
2278 },
2279 operation.timeout_per_request,
2280 );
2281
2282 async move {
2283 let result = call_proof_request_future.await;
2284 ProgressRuntimeCallRequest::CallProofRequestDone {
2285 result,
2286 operation,
2287 call_proof_sender: call_proof_target,
2288 }
2289 }
2290 }));
2291 }
2292
2293 WakeUpReason::Notification(sync_service::Notification::Block(new_block)) => {
2294 // Sync service has reported a new block.
2295
2296 let same_runtime_as_parent = same_runtime_as_parent(
2297 &new_block.scale_encoded_header,
2298 background.sync_service.block_number_bytes(),
2299 );
2300
2301 if same_runtime_as_parent {
2302 log!(
2303 &background.platform,
2304 Trace,
2305 &background.log_target,
2306 "input-chain-new-block",
2307 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2308 &new_block.scale_encoded_header
2309 )),
2310 parent_block_hash = HashDisplay(&new_block.parent_hash),
2311 is_new_best = new_block.is_new_best,
2312 same_runtime_as_parent = true
2313 );
2314 } else {
2315 log!(
2316 &background.platform,
2317 Debug,
2318 &background.log_target,
2319 "input-chain-new-block-runtime-upgrade",
2320 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2321 &new_block.scale_encoded_header
2322 )),
2323 parent_block_hash = HashDisplay(&new_block.parent_hash),
2324 is_new_best = new_block.is_new_best
2325 );
2326 }
2327
2328 match &mut background.tree {
2329 Tree::FinalizedBlockRuntimeKnown {
2330 tree,
2331 finalized_block,
2332 ..
2333 } => {
2334 let parent_index = if new_block.parent_hash == finalized_block.hash {
2335 None
2336 } else {
2337 Some(
2338 // TODO: O(n)
2339 tree.input_output_iter_unordered()
2340 .find(|block| block.user_data.hash == new_block.parent_hash)
2341 .unwrap()
2342 .id,
2343 )
2344 };
2345
2346 tree.input_insert_block(
2347 Block {
2348 hash: header::hash_from_scale_encoded_header(
2349 &new_block.scale_encoded_header,
2350 ),
2351 height: header::decode(
2352 &new_block.scale_encoded_header,
2353 background.sync_service.block_number_bytes(),
2354 )
2355 .unwrap()
2356 .number, // TODO: consider feeding the information from the sync service?
2357 scale_encoded_header: new_block.scale_encoded_header,
2358 },
2359 parent_index,
2360 same_runtime_as_parent,
2361 new_block.is_new_best,
2362 );
2363 }
2364 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2365 // TODO: O(n)
2366 let parent_index = tree
2367 .input_output_iter_unordered()
2368 .find(|block| block.user_data.hash == new_block.parent_hash)
2369 .unwrap()
2370 .id;
2371 tree.input_insert_block(
2372 Block {
2373 hash: header::hash_from_scale_encoded_header(
2374 &new_block.scale_encoded_header,
2375 ),
2376 height: header::decode(
2377 &new_block.scale_encoded_header,
2378 background.sync_service.block_number_bytes(),
2379 )
2380 .unwrap()
2381 .number, // TODO: consider feeding the information from the sync service?
2382 scale_encoded_header: new_block.scale_encoded_header,
2383 },
2384 Some(parent_index),
2385 same_runtime_as_parent,
2386 new_block.is_new_best,
2387 );
2388 }
2389 }
2390 }
2391
2392 WakeUpReason::Notification(sync_service::Notification::Finalized {
2393 hash,
2394 best_block_hash_if_changed,
2395 ..
2396 }) => {
2397 // Sync service has reported a finalized block.
2398
2399 log!(
2400 &background.platform,
2401 Trace,
2402 &background.log_target,
2403 "input-chain-finalized",
2404 block_hash = HashDisplay(&hash),
2405 best_block_hash = if let Some(best_block_hash) = best_block_hash_if_changed {
2406 Cow::Owned(HashDisplay(&best_block_hash).to_string())
2407 } else {
2408 Cow::Borrowed("<unchanged>")
2409 }
2410 );
2411
2412 if let Some(best_block_hash) = best_block_hash_if_changed {
2413 match &mut background.tree {
2414 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
2415 let new_best_block = tree
2416 .input_output_iter_unordered()
2417 .find(|block| block.user_data.hash == best_block_hash)
2418 .unwrap()
2419 .id;
2420 tree.input_set_best_block(Some(new_best_block));
2421 }
2422 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2423 let new_best_block = tree
2424 .input_output_iter_unordered()
2425 .find(|block| block.user_data.hash == best_block_hash)
2426 .unwrap()
2427 .id;
2428 tree.input_set_best_block(Some(new_best_block));
2429 }
2430 }
2431 }
2432
2433 match &mut background.tree {
2434 Tree::FinalizedBlockRuntimeKnown {
2435 tree,
2436 finalized_block,
2437 ..
2438 } => {
2439 debug_assert_ne!(finalized_block.hash, hash);
2440 let node_to_finalize = tree
2441 .input_output_iter_unordered()
2442 .find(|block| block.user_data.hash == hash)
2443 .unwrap()
2444 .id;
2445 tree.input_finalize(node_to_finalize);
2446 }
2447 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2448 let node_to_finalize = tree
2449 .input_output_iter_unordered()
2450 .find(|block| block.user_data.hash == hash)
2451 .unwrap()
2452 .id;
2453 tree.input_finalize(node_to_finalize);
2454 }
2455 }
2456 }
2457
2458 WakeUpReason::Notification(sync_service::Notification::BestBlockChanged { hash }) => {
2459 // Sync service has reported a change in the best block.
2460
2461 log!(
2462 &background.platform,
2463 Trace,
2464 &background.log_target,
2465 "input-chain-best-block-update",
2466 block_hash = HashDisplay(&hash)
2467 );
2468
2469 match &mut background.tree {
2470 Tree::FinalizedBlockRuntimeKnown {
2471 finalized_block,
2472 tree,
2473 ..
2474 } => {
2475 let idx = if hash == finalized_block.hash {
2476 None
2477 } else {
2478 Some(
2479 tree.input_output_iter_unordered()
2480 .find(|block| block.user_data.hash == hash)
2481 .unwrap()
2482 .id,
2483 )
2484 };
2485 tree.input_set_best_block(idx);
2486 }
2487 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2488 let idx = tree
2489 .input_output_iter_unordered()
2490 .find(|block| block.user_data.hash == hash)
2491 .unwrap()
2492 .id;
2493 tree.input_set_best_block(Some(idx));
2494 }
2495 }
2496 }
2497
2498 WakeUpReason::RuntimeDownloadFinished(
2499 async_op_id,
2500 Ok((
2501 storage_code,
2502 storage_heap_pages,
2503 code_merkle_value,
2504 closest_ancestor_excluding,
2505 )),
2506 ) => {
2507 // A runtime has successfully finished downloading.
2508
2509 let concerned_blocks = match &background.tree {
2510 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
2511 either::Left(tree.async_op_blocks(async_op_id))
2512 }
2513 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2514 either::Right(tree.async_op_blocks(async_op_id))
2515 }
2516 }
2517 .format_with(", ", |block, fmt| fmt(&HashDisplay(&block.hash)))
2518 .to_string();
2519
2520 // Try to find an existing runtime identical to the one that has just been
2521 // downloaded. This loop is `O(n)`, but given that we expect this list to very
2522 // small (at most 1 or 2 elements), this is not a problem.
2523 let existing_runtime = background
2524 .runtimes
2525 .iter()
2526 .filter_map(|(_, rt)| rt.upgrade())
2527 .find(|rt| {
2528 rt.runtime_code == storage_code && rt.heap_pages == storage_heap_pages
2529 });
2530
2531 // If no identical runtime was found, try compiling the runtime.
2532 let runtime = if let Some(existing_runtime) = existing_runtime {
2533 log!(
2534 &background.platform,
2535 Debug,
2536 &background.log_target,
2537 "runtime-download-finish-compilation-cache-hit",
2538 block_hashes = concerned_blocks,
2539 );
2540 existing_runtime
2541 } else {
2542 let before_compilation = background.platform.now();
2543 let runtime = compile_runtime(
2544 &background.platform,
2545 &background.log_target,
2546 &storage_code,
2547 &storage_heap_pages,
2548 );
2549 let compilation_duration = background.platform.now() - before_compilation;
2550 log!(
2551 &background.platform,
2552 Debug,
2553 &background.log_target,
2554 "runtime-download-finish-compilation-cache-miss",
2555 ?compilation_duration,
2556 compilation_success = runtime.is_ok(),
2557 block_hashes = concerned_blocks,
2558 );
2559 match &runtime {
2560 Ok(runtime) => {
2561 log!(
2562 &background.platform,
2563 Info,
2564 &background.log_target,
2565 format!(
2566 "Successfully compiled runtime. Spec version: {}. \
2567 Size of `:code`: {}.",
2568 runtime.runtime_version().decode().spec_version,
2569 BytesDisplay(
2570 u64::try_from(storage_code.as_ref().map_or(0, |v| v.len()))
2571 .unwrap()
2572 )
2573 )
2574 );
2575 }
2576 Err(error) => {
2577 log!(
2578 &background.platform,
2579 Warn,
2580 &background.log_target,
2581 format!(
2582 "Failed to compile runtime. Size of `:code`: {}.\nError: {}\n\
2583 This indicates an incompatibility between smoldot and \
2584 the chain.",
2585 BytesDisplay(
2586 u64::try_from(storage_code.as_ref().map_or(0, |v| v.len()))
2587 .unwrap()
2588 ),
2589 error
2590 )
2591 );
2592 }
2593 }
2594
2595 let runtime = Arc::new(Runtime {
2596 heap_pages: storage_heap_pages,
2597 runtime_code: storage_code,
2598 runtime,
2599 code_merkle_value,
2600 closest_ancestor_excluding,
2601 });
2602
2603 background.runtimes.insert(Arc::downgrade(&runtime));
2604 runtime
2605 };
2606
2607 // Insert the runtime into the tree.
2608 match &mut background.tree {
2609 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
2610 tree.async_op_finished(async_op_id, runtime);
2611 }
2612 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2613 tree.async_op_finished(async_op_id, Some(runtime));
2614 }
2615 }
2616 }
2617
2618 WakeUpReason::RuntimeDownloadFinished(async_op_id, Err(error)) => {
2619 // A runtime download has failed.
2620
2621 let concerned_blocks = match &background.tree {
2622 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
2623 either::Left(tree.async_op_blocks(async_op_id))
2624 }
2625 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2626 either::Right(tree.async_op_blocks(async_op_id))
2627 }
2628 }
2629 .format_with(", ", |block, fmt| fmt(&HashDisplay(&block.hash)))
2630 .to_string();
2631
2632 log!(
2633 &background.platform,
2634 Debug,
2635 &background.log_target,
2636 "runtime-download-error",
2637 block_hashes = concerned_blocks,
2638 ?error
2639 );
2640 if !error.is_network_problem() {
2641 log!(
2642 &background.platform,
2643 Warn,
2644 &background.log_target,
2645 format!(
2646 "Failed to download :code and :heappages of blocks {}: {}",
2647 concerned_blocks, error
2648 )
2649 );
2650 }
2651
2652 match &mut background.tree {
2653 Tree::FinalizedBlockRuntimeKnown { tree, .. } => {
2654 tree.async_op_failure(async_op_id, &background.platform.now());
2655 }
2656 Tree::FinalizedBlockRuntimeUnknown { tree, .. } => {
2657 tree.async_op_failure(async_op_id, &background.platform.now());
2658 }
2659 }
2660 }
2661 }
2662 }
2663}
2664
2665#[derive(Debug, Clone, derive_more::Display, derive_more::Error)]
2666enum RuntimeDownloadError {
2667 #[display("{_0}")]
2668 StorageQuery(sync_service::StorageQueryError),
2669 #[display("Couldn't decode header: {_0}")]
2670 InvalidHeader(header::Error),
2671}
2672
2673impl RuntimeDownloadError {
2674 /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
2675 /// issue.
2676 fn is_network_problem(&self) -> bool {
2677 match self {
2678 RuntimeDownloadError::StorageQuery(err) => err.is_network_problem(),
2679 RuntimeDownloadError::InvalidHeader(_) => false,
2680 }
2681 }
2682}
2683
2684struct Background<TPlat: PlatformRef> {
2685 /// Target to use for all the logs of this service.
2686 log_target: String,
2687
2688 /// See [`Config::platform`].
2689 platform: TPlat,
2690
2691 /// See [`Config::sync_service`].
2692 sync_service: Arc<sync_service::SyncService<TPlat>>,
2693
2694 /// See [`Config::network_service`].
2695 network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
2696
2697 /// Receiver for messages to the background task.
2698 to_background: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
2699
2700 /// Sending side of [`Background::to_background`].
2701 to_background_tx: async_channel::WeakSender<ToBackground<TPlat>>,
2702
2703 /// Identifier of the next subscription for
2704 /// [`Tree::FinalizedBlockRuntimeKnown::all_blocks_subscriptions`].
2705 ///
2706 /// To avoid race conditions, subscription IDs are never used, even if we switch back to
2707 /// [`Tree::FinalizedBlockRuntimeUnknown`].
2708 next_subscription_id: u64,
2709
2710 /// List of runtimes referenced by the tree in [`Tree`] and by
2711 /// [`Tree::FinalizedBlockRuntimeKnown::pinned_blocks`].
2712 ///
2713 /// Might contains obsolete values (i.e. stale `Weak`s) and thus must be cleaned from time to
2714 /// time.
2715 ///
2716 /// Because this list shouldn't contain many entries, it is acceptable to iterate over all
2717 /// the elements.
2718 runtimes: slab::Slab<Weak<Runtime>>,
2719
2720 /// Tree of blocks received from the sync service. Keeps track of which block has been
2721 /// reported to the outer API.
2722 tree: Tree<TPlat>,
2723
2724 /// List of subscription attempts started with
2725 /// [`Tree::FinalizedBlockRuntimeKnown::all_blocks_subscriptions`].
2726 ///
2727 /// When in the [`Tree::FinalizedBlockRuntimeKnown`] state, a [`SubscribeAll`] is constructed
2728 /// and sent back for each of these senders.
2729 /// When in the [`Tree::FinalizedBlockRuntimeUnknown`] state, the senders patiently wait here.
2730 pending_subscriptions: VecDeque<ToBackgroundSubscribeAll<TPlat>>,
2731
2732 /// Stream of notifications coming from the sync service. `None` if not subscribed yet.
2733 blocks_stream: Option<Pin<Box<dyn Stream<Item = sync_service::Notification> + Send>>>,
2734
2735 /// List of runtimes currently being downloaded from the network.
2736 /// For each item, the download id, storage value of `:code`, storage value of `:heappages`,
2737 /// and Merkle value and closest ancestor of `:code`.
2738 // TODO: use struct
2739 runtime_downloads: stream::FuturesUnordered<
2740 future::BoxFuture<
2741 'static,
2742 (
2743 async_tree::AsyncOpId,
2744 Result<
2745 (
2746 Option<Vec<u8>>,
2747 Option<Vec<u8>>,
2748 Option<Vec<u8>>,
2749 Option<Vec<Nibble>>,
2750 ),
2751 RuntimeDownloadError,
2752 >,
2753 ),
2754 >,
2755 >,
2756
2757 /// List of actions to perform to progress runtime calls requested by the frontend.
2758 progress_runtime_call_requests:
2759 stream::FuturesUnordered<future::BoxFuture<'static, ProgressRuntimeCallRequest>>,
2760}
2761
2762enum Tree<TPlat: PlatformRef> {
2763 FinalizedBlockRuntimeKnown {
2764 /// Tree of blocks. Holds the state of the download of everything. Always `Some` when the
2765 /// `Mutex` is being locked. Temporarily switched to `None` during some operations.
2766 ///
2767 /// The asynchronous operation user data is a `usize` corresponding to the index within
2768 /// [`Background::runtimes`].
2769 tree: async_tree::AsyncTree<TPlat::Instant, Block, Arc<Runtime>>,
2770
2771 /// Finalized block. Outside of the tree.
2772 finalized_block: Block,
2773
2774 /// List of senders that get notified when new blocks arrive.
2775 /// See [`RuntimeService::subscribe_all`]. Alongside with each sender, the number of pinned
2776 /// finalized or non-canonical blocks remaining for this subscription.
2777 ///
2778 /// Keys are assigned from [`Background::next_subscription_id`].
2779 all_blocks_subscriptions: hashbrown::HashMap<
2780 u64,
2781 (async_channel::Sender<Notification>, usize),
2782 fnv::FnvBuildHasher,
2783 >,
2784
2785 /// List of pinned blocks.
2786 ///
2787 /// Every time a block is reported to the API user, it is inserted in this map. The block
2788 /// is inserted after it has been pushed in the channel, but before it is pulled.
2789 /// Therefore, if the channel is closed it is the background that needs to purge all
2790 /// blocks from this container that are no longer relevant.
2791 ///
2792 /// Keys are `(subscription_id, block_hash)`. Values are indices within
2793 /// [`Background::runtimes`], state trie root hashes, block numbers, and whether the block
2794 /// is non-finalized and part of the canonical chain.
2795 pinned_blocks: BTreeMap<(u64, [u8; 32]), PinnedBlock>,
2796 },
2797 FinalizedBlockRuntimeUnknown {
2798 /// Tree of blocks. Holds the state of the download of everything. Always `Some` when the
2799 /// `Mutex` is being locked. Temporarily switched to `None` during some operations.
2800 ///
2801 /// The finalized block according to the [`async_tree::AsyncTree`] is actually a dummy.
2802 /// The "real" finalized block is a non-finalized block within this tree.
2803 ///
2804 /// The asynchronous operation user data is a `usize` corresponding to the index within
2805 /// [`Background::runtimes`]. The asynchronous operation user data is `None` for the dummy
2806 /// finalized block.
2807 // TODO: explain better
2808 tree: async_tree::AsyncTree<TPlat::Instant, Block, Option<Arc<Runtime>>>,
2809 },
2810}
2811
2812/// See [`Background::progress_runtime_call_requests`].
2813enum ProgressRuntimeCallRequest {
2814 /// Must start the first call proof request.
2815 Initialize(RuntimeCallRequest),
2816 /// A call proof request has finished and the runtime call can be advanced.
2817 CallProofRequestDone {
2818 /// Outcome of the latest call proof request.
2819 result: Result<network_service::EncodedMerkleProof, network_service::CallProofRequestError>,
2820 /// Identity of the peer the call proof request was made against.
2821 call_proof_sender: network_service::PeerId,
2822 operation: RuntimeCallRequest,
2823 },
2824}
2825
2826/// See [`ProgressRuntimeCallRequest`].
2827struct RuntimeCallRequest {
2828 block_hash: [u8; 32],
2829 block_number: u64,
2830 block_state_trie_root_hash: [u8; 32],
2831 function_name: String,
2832 /// Version of the API that was found. `Some` if and only if an API requirement was passed.
2833 api_version: Option<u32>,
2834 parameters_vectored: Vec<u8>,
2835 runtime: executor::host::HostVmPrototype,
2836 total_attempts: u32,
2837 timeout_per_request: Duration,
2838 inaccessible_errors: Vec<RuntimeCallInaccessibleError>,
2839 result_tx: oneshot::Sender<Result<RuntimeCallSuccess, RuntimeCallError>>,
2840}
2841
2842struct Runtime {
2843 /// Successfully-compiled runtime and all its information. Can contain an error if an error
2844 /// happened, including a problem when obtaining the runtime specs.
2845 runtime: Result<executor::host::HostVmPrototype, RuntimeError>,
2846
2847 /// Merkle value of the `:code` trie node.
2848 ///
2849 /// Can be `None` if the storage is empty, in which case the runtime will have failed to
2850 /// build.
2851 code_merkle_value: Option<Vec<u8>>,
2852
2853 /// Closest ancestor of the `:code` key except for `:code` itself.
2854 closest_ancestor_excluding: Option<Vec<Nibble>>,
2855
2856 /// Undecoded storage value of `:code` corresponding to the [`Runtime::runtime`]
2857 /// field.
2858 ///
2859 /// Can be `None` if the storage is empty, in which case the runtime will have failed to
2860 /// build.
2861 // TODO: consider storing hash instead
2862 runtime_code: Option<Vec<u8>>,
2863
2864 /// Undecoded storage value of `:heappages` corresponding to the
2865 /// [`Runtime::runtime`] field.
2866 ///
2867 /// Can be `None` if the storage is empty, in which case the runtime will have failed to
2868 /// build.
2869 // TODO: consider storing hash instead
2870 heap_pages: Option<Vec<u8>>,
2871}
2872
2873fn compile_runtime<TPlat: PlatformRef>(
2874 platform: &TPlat,
2875 log_target: &str,
2876 code: &Option<Vec<u8>>,
2877 heap_pages: &Option<Vec<u8>>,
2878) -> Result<executor::host::HostVmPrototype, RuntimeError> {
2879 // Parameters for `HostVmPrototype::new`.
2880 let module = code.as_ref().ok_or(RuntimeError::CodeNotFound)?;
2881 let heap_pages = executor::storage_heap_pages_to_value(heap_pages.as_deref())
2882 .map_err(RuntimeError::InvalidHeapPages)?;
2883 let exec_hint = executor::vm::ExecHint::CompileWithNonDeterministicValidation;
2884
2885 // We try once with `allow_unresolved_imports: false`. If this fails due to unresolved
2886 // import, we try again but with `allowed_unresolved_imports: true`.
2887 // Having unresolved imports might cause errors later on, for example when validating
2888 // transactions or getting the parachain heads, but for now we continue the execution
2889 // and print a warning.
2890 match executor::host::HostVmPrototype::new(executor::host::Config {
2891 module,
2892 heap_pages,
2893 exec_hint,
2894 allow_unresolved_imports: false,
2895 }) {
2896 Ok(vm) => Ok(vm),
2897 Err(executor::host::NewErr::VirtualMachine(
2898 executor::vm::NewErr::UnresolvedFunctionImport {
2899 function,
2900 module_name,
2901 },
2902 )) => {
2903 match executor::host::HostVmPrototype::new(executor::host::Config {
2904 module,
2905 heap_pages,
2906 exec_hint,
2907 allow_unresolved_imports: true,
2908 }) {
2909 Ok(vm) => {
2910 log!(
2911 platform,
2912 Warn,
2913 log_target,
2914 format!(
2915 "Unresolved host function in runtime: `{}`:`{}`. Smoldot might \
2916 encounter errors later on. Please report this issue in \
2917 https://github.com/smol-dot/smoldot",
2918 module_name, function
2919 )
2920 );
2921
2922 Ok(vm)
2923 }
2924 Err(executor::host::NewErr::VirtualMachine(
2925 executor::vm::NewErr::UnresolvedFunctionImport { .. },
2926 )) => unreachable!(),
2927 Err(error) => {
2928 // It's still possible that errors other than an unresolved host
2929 // function happen.
2930 Err(RuntimeError::Build(error))
2931 }
2932 }
2933 }
2934 Err(error) => Err(RuntimeError::Build(error)),
2935 }
2936}
2937
2938/// Returns `true` if the block can be assumed to have the same runtime as its parent.
2939fn same_runtime_as_parent(header: &[u8], block_number_bytes: usize) -> bool {
2940 match header::decode(header, block_number_bytes) {
2941 Ok(h) => !h.digest.has_runtime_environment_updated(),
2942 Err(_) => false,
2943 }
2944}
2945
2946fn download_runtime<TPlat: PlatformRef>(
2947 sync_service: Arc<sync_service::SyncService<TPlat>>,
2948 block_hash: [u8; 32],
2949 scale_encoded_header: &[u8],
2950) -> impl Future<
2951 Output = Result<
2952 (
2953 Option<Vec<u8>>,
2954 Option<Vec<u8>>,
2955 Option<Vec<u8>>,
2956 Option<Vec<Nibble>>,
2957 ),
2958 RuntimeDownloadError,
2959 >,
2960> + use<TPlat> {
2961 // In order to perform the download, we need to known the state root hash of the
2962 // block in question, which requires decoding the block. If the decoding fails,
2963 // we report that the asynchronous operation has failed with the hope that this
2964 // block gets pruned in the future.
2965 let block_info = match header::decode(scale_encoded_header, sync_service.block_number_bytes()) {
2966 Ok(decoded_header) => Ok((*decoded_header.state_root, decoded_header.number)),
2967 Err(error) => Err(RuntimeDownloadError::InvalidHeader(error)),
2968 };
2969
2970 async move {
2971 let (state_root, block_number) = block_info?;
2972
2973 let mut storage_code = None;
2974 let mut storage_heap_pages = None;
2975 let mut code_merkle_value = None;
2976 let mut code_closest_ancestor_excluding = None;
2977
2978 let mut query = sync_service
2979 .clone()
2980 .storage_query(
2981 block_number,
2982 block_hash,
2983 state_root,
2984 [
2985 sync_service::StorageRequestItem {
2986 key: b":code".to_vec(),
2987 ty: sync_service::StorageRequestItemTy::ClosestDescendantMerkleValue,
2988 },
2989 sync_service::StorageRequestItem {
2990 key: b":code".to_vec(),
2991 ty: sync_service::StorageRequestItemTy::Value,
2992 },
2993 sync_service::StorageRequestItem {
2994 key: b":heappages".to_vec(),
2995 ty: sync_service::StorageRequestItemTy::Value,
2996 },
2997 ]
2998 .into_iter(),
2999 3,
3000 Duration::from_secs(20),
3001 NonZero::<u32>::new(3).unwrap(),
3002 )
3003 .advance()
3004 .await;
3005
3006 loop {
3007 match query {
3008 sync_service::StorageQueryProgress::Finished => {
3009 break Ok((
3010 storage_code,
3011 storage_heap_pages,
3012 code_merkle_value,
3013 code_closest_ancestor_excluding,
3014 ));
3015 }
3016 sync_service::StorageQueryProgress::Progress {
3017 request_index: 0,
3018 item:
3019 sync_service::StorageResultItem::ClosestDescendantMerkleValue {
3020 closest_descendant_merkle_value,
3021 found_closest_ancestor_excluding,
3022 ..
3023 },
3024 query: next,
3025 } => {
3026 code_merkle_value = closest_descendant_merkle_value;
3027 code_closest_ancestor_excluding = found_closest_ancestor_excluding;
3028 query = next.advance().await;
3029 }
3030 sync_service::StorageQueryProgress::Progress {
3031 request_index: 1,
3032 item: sync_service::StorageResultItem::Value { value, .. },
3033 query: next,
3034 } => {
3035 storage_code = value;
3036 query = next.advance().await;
3037 }
3038 sync_service::StorageQueryProgress::Progress {
3039 request_index: 2,
3040 item: sync_service::StorageResultItem::Value { value, .. },
3041 query: next,
3042 } => {
3043 storage_heap_pages = value;
3044 query = next.advance().await;
3045 }
3046 sync_service::StorageQueryProgress::Progress { .. } => unreachable!(),
3047 sync_service::StorageQueryProgress::Error(error) => {
3048 break Err(RuntimeDownloadError::StorageQuery(error));
3049 }
3050 }
3051 }
3052 }
3053}
3054
3055/// Tries to perform a runtime call using the given call proof.
3056///
3057/// This function can have three possible outcomes: success, failure because the call proof is
3058/// invalid/incomplete, or failure because the execution fails.
3059///
3060/// This function is async in order to periodically yield during the execution.
3061async fn runtime_call_single_attempt<TPlat: PlatformRef>(
3062 platform: &TPlat,
3063 runtime: executor::host::HostVmPrototype,
3064 function_name: &str,
3065 parameters_vectored: &[u8],
3066 block_state_trie_root_hash: &[u8; 32],
3067 call_proof: &[u8],
3068) -> (
3069 SingleRuntimeCallTiming,
3070 Result<Vec<u8>, SingleRuntimeCallAttemptError>,
3071) {
3072 // Try to decode the proof. Succeed just means that the proof has the correct
3073 // encoding, and doesn't guarantee that the proof has all the necessary
3074 // entries.
3075 let call_proof = trie::proof_decode::decode_and_verify_proof(trie::proof_decode::Config {
3076 proof: call_proof,
3077 });
3078
3079 // Keep track of the total time taken by the runtime call attempt.
3080 let mut timing = SingleRuntimeCallTiming {
3081 virtual_machine_call_duration: Duration::new(0, 0),
3082 proof_access_duration: Duration::new(0, 0),
3083 };
3084
3085 // Attempt the runtime call.
3086 // If the call succeed, we interrupt the flow and `continue`.
3087 let runtime_call_duration_before = platform.now();
3088 let mut call = match executor::runtime_call::run(executor::runtime_call::Config {
3089 virtual_machine: runtime,
3090 function_to_call: function_name,
3091 parameter: iter::once(parameters_vectored),
3092 storage_proof_size_behavior:
3093 executor::runtime_call::StorageProofSizeBehavior::proof_recording_disabled(),
3094 storage_main_trie_changes: Default::default(),
3095 max_log_level: 0,
3096 calculate_trie_changes: false,
3097 }) {
3098 Ok(call) => call,
3099 Err((error, _)) => {
3100 // If starting the execution triggers an error, then the runtime call cannot
3101 // possibly succeed.
3102 // This can happen for example because the requested function doesn't exist.
3103 return (
3104 timing,
3105 Err(SingleRuntimeCallAttemptError::Execution(
3106 RuntimeCallExecutionError::Start(error),
3107 )),
3108 );
3109 }
3110 };
3111 timing.virtual_machine_call_duration += platform.now() - runtime_call_duration_before;
3112
3113 loop {
3114 let call_proof = match &call_proof {
3115 Ok(p) => p,
3116 Err(error) => {
3117 return (
3118 timing,
3119 Err(SingleRuntimeCallAttemptError::Inaccessible(
3120 RuntimeCallInaccessibleError::InvalidCallProof(error.clone()),
3121 )),
3122 );
3123 }
3124 };
3125
3126 // Yield once at every iteration. This avoids monopolizing the CPU for
3127 // too long.
3128 futures_lite::future::yield_now().await;
3129
3130 let child_trie = match call {
3131 executor::runtime_call::RuntimeCall::Finished(Ok(finished)) => {
3132 // Execution finished successfully.
3133 // This is the happy path.
3134 let output = finished.virtual_machine.value().as_ref().to_owned();
3135 return (timing, Ok(output));
3136 }
3137 executor::runtime_call::RuntimeCall::Finished(Err(error)) => {
3138 // Execution finished with an error.
3139 return (
3140 timing,
3141 Err(SingleRuntimeCallAttemptError::Execution(
3142 RuntimeCallExecutionError::Execution(error.detail),
3143 )),
3144 );
3145 }
3146 executor::runtime_call::RuntimeCall::StorageGet(ref get) => {
3147 get.child_trie().map(|c| c.as_ref().to_owned()) // TODO: overhead
3148 }
3149 executor::runtime_call::RuntimeCall::ClosestDescendantMerkleValue(ref mv) => {
3150 mv.child_trie().map(|c| c.as_ref().to_owned())
3151 } // TODO: overhead
3152 executor::runtime_call::RuntimeCall::NextKey(ref nk) => {
3153 nk.child_trie().map(|c| c.as_ref().to_owned()) // TODO: overhead
3154 }
3155 executor::runtime_call::RuntimeCall::SignatureVerification(r) => {
3156 let runtime_call_duration_before = platform.now();
3157 call = r.verify_and_resume();
3158 timing.virtual_machine_call_duration +=
3159 platform.now() - runtime_call_duration_before;
3160 continue;
3161 }
3162 executor::runtime_call::RuntimeCall::LogEmit(r) => {
3163 // Logs are ignored.
3164 let runtime_call_duration_before = platform.now();
3165 call = r.resume();
3166 timing.virtual_machine_call_duration +=
3167 platform.now() - runtime_call_duration_before;
3168 continue;
3169 }
3170 executor::runtime_call::RuntimeCall::Offchain(_) => {
3171 // Forbidden host function called.
3172 return (
3173 timing,
3174 Err(SingleRuntimeCallAttemptError::Execution(
3175 RuntimeCallExecutionError::ForbiddenHostFunction,
3176 )),
3177 );
3178 }
3179 executor::runtime_call::RuntimeCall::OffchainStorageSet(r) => {
3180 // Ignore offchain storage writes.
3181 let runtime_call_duration_before = platform.now();
3182 call = r.resume();
3183 timing.virtual_machine_call_duration +=
3184 platform.now() - runtime_call_duration_before;
3185 continue;
3186 }
3187 };
3188
3189 let proof_access_duration_before = platform.now();
3190 let trie_root = if let Some(child_trie) = child_trie {
3191 // TODO: allocation here, but probably not problematic
3192 const PREFIX: &[u8] = b":child_storage:default:";
3193 let mut key = Vec::with_capacity(PREFIX.len() + child_trie.len());
3194 key.extend_from_slice(PREFIX);
3195 key.extend_from_slice(child_trie.as_ref());
3196 match call_proof.storage_value(block_state_trie_root_hash, &key) {
3197 Err(_) => {
3198 return (
3199 timing,
3200 Err(SingleRuntimeCallAttemptError::Inaccessible(
3201 RuntimeCallInaccessibleError::MissingProofEntry,
3202 )),
3203 );
3204 }
3205 Ok(None) => None,
3206 Ok(Some((value, _))) => match <&[u8; 32]>::try_from(value) {
3207 Ok(hash) => Some(hash),
3208 Err(_) => {
3209 return (
3210 timing,
3211 Err(SingleRuntimeCallAttemptError::Inaccessible(
3212 RuntimeCallInaccessibleError::MissingProofEntry,
3213 )),
3214 );
3215 }
3216 },
3217 }
3218 } else {
3219 Some(block_state_trie_root_hash)
3220 };
3221
3222 match call {
3223 executor::runtime_call::RuntimeCall::StorageGet(get) => {
3224 let storage_value = if let Some(trie_root) = trie_root {
3225 call_proof.storage_value(trie_root, get.key().as_ref())
3226 } else {
3227 Ok(None)
3228 };
3229 let Ok(storage_value) = storage_value else {
3230 return (
3231 timing,
3232 Err(SingleRuntimeCallAttemptError::Inaccessible(
3233 RuntimeCallInaccessibleError::MissingProofEntry,
3234 )),
3235 );
3236 };
3237 timing.proof_access_duration += platform.now() - proof_access_duration_before;
3238
3239 let runtime_call_duration_before = platform.now();
3240 call = get.inject_value(storage_value.map(|(val, vers)| (iter::once(val), vers)));
3241 timing.virtual_machine_call_duration +=
3242 platform.now() - runtime_call_duration_before;
3243 }
3244 executor::runtime_call::RuntimeCall::ClosestDescendantMerkleValue(mv) => {
3245 let merkle_value = if let Some(trie_root) = trie_root {
3246 call_proof.closest_descendant_merkle_value(trie_root, mv.key())
3247 } else {
3248 Ok(None)
3249 };
3250 let Ok(merkle_value) = merkle_value else {
3251 return (
3252 timing,
3253 Err(SingleRuntimeCallAttemptError::Inaccessible(
3254 RuntimeCallInaccessibleError::MissingProofEntry,
3255 )),
3256 );
3257 };
3258 timing.proof_access_duration += platform.now() - proof_access_duration_before;
3259
3260 let runtime_call_duration_before = platform.now();
3261 call = mv.inject_merkle_value(merkle_value);
3262 timing.virtual_machine_call_duration +=
3263 platform.now() - runtime_call_duration_before;
3264 }
3265 executor::runtime_call::RuntimeCall::NextKey(nk) => {
3266 let next_key = if let Some(trie_root) = trie_root {
3267 call_proof.next_key(
3268 trie_root,
3269 nk.key(),
3270 nk.or_equal(),
3271 nk.prefix(),
3272 nk.branch_nodes(),
3273 )
3274 } else {
3275 Ok(None)
3276 };
3277 let Ok(next_key) = next_key else {
3278 return (
3279 timing,
3280 Err(SingleRuntimeCallAttemptError::Inaccessible(
3281 RuntimeCallInaccessibleError::MissingProofEntry,
3282 )),
3283 );
3284 };
3285 timing.proof_access_duration += platform.now() - proof_access_duration_before;
3286
3287 let runtime_call_duration_before = platform.now();
3288 call = nk.inject_key(next_key);
3289 timing.virtual_machine_call_duration +=
3290 platform.now() - runtime_call_duration_before;
3291 }
3292 _ => unreachable!(),
3293 }
3294 }
3295}
3296
3297/// See [`runtime_call_single_attempt`].
3298#[derive(Debug, Clone)]
3299struct SingleRuntimeCallTiming {
3300 /// Time spent execution the virtual machine.
3301 virtual_machine_call_duration: Duration,
3302 /// Time spent accessing the call proof.
3303 proof_access_duration: Duration,
3304}
3305
3306/// See [`runtime_call_single_attempt`].
3307#[derive(Debug, derive::Display, derive_more::Error, Clone)]
3308enum SingleRuntimeCallAttemptError {
3309 /// Error during the execution of the runtime.
3310 ///
3311 /// There is no point in trying the same call again, as it would result in the same error.
3312 #[display("Error during the execution of the runtime: {_0}")]
3313 Execution(RuntimeCallExecutionError),
3314
3315 /// Error trying to access the storage required for the runtime call.
3316 ///
3317 /// Trying the same call again might succeed.
3318 #[display("Error trying to access the storage required for the runtime call: {_0}")]
3319 Inaccessible(RuntimeCallInaccessibleError),
3320}