casper_node/components/
storage.rs

1//! Central storage component.
2//!
3//! The central storage component is in charge of persisting data to disk. Its core functionalities
4//! are
5//!
6//! * storing and loading blocks,
7//! * storing and loading deploys,
8//! * [temporary until refactored] holding `DeployExecutionInfo` for each deploy,
9//! * keeping an index of blocks by height and
10//! * [unimplemented] managing disk usage by pruning blocks and deploys from storage.
11//!
12//! Any I/O performed by the component is done on the event handling thread, this is on purpose as
13//! the assumption is that caching by LMDB will offset any gains from offloading it onto a separate
14//! thread, while keeping the maximum event processing time reasonable.
15//!
16//! ## Consistency
17//!
18//! The storage upholds a few invariants internally, namely:
19//!
20//! * [temporary until refactored] Storing an execution result for a deploy in the context of a
21//!   block is guaranteed to be idempotent: Storing the same result twice is a no-op, whilst
22//!   attempting to store a differing one will cause a fatal error.
23//! * Only one block can ever be associated with a specific block height. Attempting to store a
24//!   block with a different block already existing at the same height causes a fatal error.
25//! * Storing a deploy or block that already exists (same hash) is fine and will silently be
26//!   accepted.
27//!
28//! ## Errors
29//!
30//! The storage component itself is panic free and in general reports three classes of errors:
31//! Corruption, temporary resource exhaustion and potential bugs.
32
33mod config;
34pub(crate) mod disjoint_sequences;
35mod error;
36mod event;
37mod metrics;
38mod object_pool;
39#[cfg(test)]
40mod tests;
41mod utils;
42
43use casper_storage::block_store::{
44    lmdb::{IndexedLmdbBlockStore, LmdbBlockStore},
45    types::{
46        ApprovalsHashes, BlockExecutionResults, BlockHashHeightAndEra, BlockHeight, BlockTransfers,
47        LatestSwitchBlock, StateStore, StateStoreKey, Tip, TransactionFinalizedApprovals,
48    },
49    BlockStoreError, BlockStoreProvider, BlockStoreTransaction, DataReader, DataWriter,
50};
51
52use std::{
53    borrow::Cow,
54    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
55    convert::TryInto,
56    fmt::{self, Display, Formatter},
57    fs::{self, OpenOptions},
58    io::ErrorKind,
59    path::{Path, PathBuf},
60    sync::Arc,
61};
62
63use casper_storage::DbRawBytesSpec;
64#[cfg(test)]
65use casper_types::BlockWithSignatures;
66use casper_types::{
67    bytesrepr::{FromBytes, ToBytes},
68    execution::{execution_result_v1, ExecutionResult, ExecutionResultV1},
69    Approval, ApprovalsHash, AvailableBlockRange, Block, BlockBody, BlockHash, BlockHeader,
70    BlockHeaderWithSignatures, BlockSignatures, BlockSignaturesV1, BlockSignaturesV2, BlockV2,
71    ChainNameDigest, DeployHash, EraId, ExecutionInfo, FinalitySignature, ProtocolVersion,
72    Timestamp, Transaction, TransactionConfig, TransactionHash, TransactionId, Transfer, U512,
73};
74use datasize::DataSize;
75use num_rational::Ratio;
76use prometheus::Registry;
77use smallvec::SmallVec;
78use tracing::{debug, error, info, warn};
79
80use crate::{
81    components::{
82        fetcher::{FetchItem, FetchResponse},
83        Component,
84    },
85    effect::{
86        announcements::FatalAnnouncement,
87        incoming::{NetRequest, NetRequestIncoming},
88        requests::{MarkBlockCompletedRequest, NetworkRequest, StorageRequest},
89        EffectBuilder, EffectExt, Effects,
90    },
91    fatal,
92    protocol::Message,
93    types::{
94        BlockExecutionResultsOrChunk, BlockExecutionResultsOrChunkId, BlockWithMetadata,
95        ExecutableBlock, LegacyDeploy, MaxTtl, NodeId, NodeRng, SyncLeap, SyncLeapIdentifier,
96        TransactionHeader, VariantMismatch,
97    },
98    utils::{display_error, WithDir},
99};
100
101pub use config::Config;
102use disjoint_sequences::{DisjointSequences, Sequence};
103pub use error::FatalStorageError;
104use error::GetRequestError;
105pub(crate) use event::Event;
106use metrics::Metrics;
107use object_pool::ObjectPool;
108
109const COMPONENT_NAME: &str = "storage";
110
111/// Key under which completed blocks are to be stored.
112const COMPLETED_BLOCKS_STORAGE_KEY: &[u8] = b"completed_blocks_disjoint_sequences";
113/// Name of the file created when initializing a force resync.
114const FORCE_RESYNC_FILE_NAME: &str = "force_resync";
115
116const STORAGE_FILES: [&str; 5] = [
117    "data.lmdb",
118    "data.lmdb-lock",
119    "storage.lmdb",
120    "storage.lmdb-lock",
121    "sse_index",
122];
123
124/// The storage component.
125#[derive(DataSize, Debug)]
126pub struct Storage {
127    /// Storage location.
128    root: PathBuf,
129    /// Block store
130    pub(crate) block_store: IndexedLmdbBlockStore,
131    /// Runs of completed blocks known in storage.
132    completed_blocks: DisjointSequences,
133    /// The activation point era of the current protocol version.
134    activation_era: EraId,
135    /// The height of the final switch block of the previous protocol version.
136    key_block_height_for_activation_point: Option<u64>,
137    /// Whether or not memory deduplication is enabled.
138    enable_mem_deduplication: bool,
139    /// An in-memory pool of already loaded serialized items.
140    ///
141    /// Keyed by serialized item ID, contains the serialized item.
142    serialized_item_pool: ObjectPool<Box<[u8]>>,
143    /// The number of eras relative to the highest block's era which are considered as recent for
144    /// the purpose of deciding how to respond to a `NetRequest::SyncLeap`.
145    recent_era_count: u64,
146    #[data_size(skip)]
147    metrics: Option<Metrics>,
148    /// The maximum TTL of a deploy.
149    max_ttl: MaxTtl,
150    /// The hash of the chain name.
151    chain_name_hash: ChainNameDigest,
152    /// The transaction config as specified by the chainspec.
153    transaction_config: TransactionConfig,
154    /// The utilization of blocks.
155    utilization_tracker: BTreeMap<EraId, BTreeMap<u64, u64>>,
156}
157
158pub(crate) enum HighestOrphanedBlockResult {
159    MissingHighestSequence,
160    Orphan(BlockHeader),
161    MissingHeader(u64),
162}
163
164impl Display for HighestOrphanedBlockResult {
165    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
166        match self {
167            HighestOrphanedBlockResult::MissingHighestSequence => {
168                write!(f, "missing highest sequence")
169            }
170            HighestOrphanedBlockResult::Orphan(block_header) => write!(
171                f,
172                "orphan, height={}, hash={}",
173                block_header.height(),
174                block_header.block_hash()
175            ),
176            HighestOrphanedBlockResult::MissingHeader(height) => {
177                write!(f, "missing header for block at height: {}", height)
178            }
179        }
180    }
181}
182
183impl<REv> Component<REv> for Storage
184where
185    REv: From<FatalAnnouncement> + From<NetworkRequest<Message>> + Send,
186{
187    type Event = Event;
188
189    fn handle_event(
190        &mut self,
191        effect_builder: EffectBuilder<REv>,
192        _rng: &mut NodeRng,
193        event: Self::Event,
194    ) -> Effects<Self::Event> {
195        let result = match event {
196            Event::StorageRequest(req) => self.handle_storage_request(*req),
197            Event::NetRequestIncoming(ref incoming) => {
198                match self.handle_net_request_incoming::<REv>(effect_builder, incoming) {
199                    Ok(effects) => Ok(effects),
200                    Err(GetRequestError::Fatal(fatal_error)) => Err(fatal_error),
201                    Err(ref other_err) => {
202                        warn!(
203                            sender=%incoming.sender,
204                            err=display_error(other_err),
205                            "error handling net request"
206                        );
207                        // We could still send the requester a "not found" message, and could do
208                        // so even in the fatal case, but it is safer to not do so at the
209                        // moment, giving less surface area for possible amplification attacks.
210                        Ok(Effects::new())
211                    }
212                }
213            }
214            Event::MarkBlockCompletedRequest(req) => self.handle_mark_block_completed_request(req),
215            Event::MakeBlockExecutableRequest(req) => {
216                let ret = self.make_executable_block(&req.block_hash);
217                match ret {
218                    Ok(maybe) => Ok(req.responder.respond(maybe).ignore()),
219                    Err(err) => Err(err),
220                }
221            }
222        };
223
224        // Any error is turned into a fatal effect, the component itself does not panic. Note that
225        // we are dropping a lot of responders this way, but since we are crashing with fatal
226        // anyway, it should not matter.
227        match result {
228            Ok(effects) => effects,
229            Err(err) => fatal!(effect_builder, "storage error: {}", err).ignore(),
230        }
231    }
232
233    fn name(&self) -> &str {
234        COMPONENT_NAME
235    }
236}
237
238impl Storage {
239    /// Creates a new storage component.
240    #[allow(clippy::too_many_arguments)]
241    pub fn new(
242        cfg: &WithDir<Config>,
243        hard_reset_to_start_of_era: Option<EraId>,
244        protocol_version: ProtocolVersion,
245        activation_era: EraId,
246        network_name: &str,
247        max_ttl: MaxTtl,
248        recent_era_count: u64,
249        registry: Option<&Registry>,
250        force_resync: bool,
251        transaction_config: TransactionConfig,
252    ) -> Result<Self, FatalStorageError> {
253        let config = cfg.value();
254
255        // Create the database directory.
256        let mut root = cfg.with_dir(config.path.clone());
257        let network_subdir = root.join(network_name);
258
259        if !network_subdir.exists() {
260            fs::create_dir_all(&network_subdir).map_err(|err| {
261                FatalStorageError::CreateDatabaseDirectory(network_subdir.clone(), err)
262            })?;
263        }
264
265        if should_move_storage_files_to_network_subdir(&root, &STORAGE_FILES)? {
266            move_storage_files_to_network_subdir(&root, &network_subdir, &STORAGE_FILES)?;
267        }
268
269        root = network_subdir;
270
271        // Calculate the upper bound for the memory map that is potentially used.
272        let total_size = config
273            .max_block_store_size
274            .saturating_add(config.max_deploy_store_size)
275            .saturating_add(config.max_deploy_metadata_store_size);
276
277        let block_store = LmdbBlockStore::new(root.as_path(), total_size)?;
278        let indexed_block_store =
279            IndexedLmdbBlockStore::new(block_store, hard_reset_to_start_of_era, protocol_version)?;
280
281        let metrics = registry.map(Metrics::new).transpose()?;
282
283        let mut component = Self {
284            root,
285            block_store: indexed_block_store,
286            completed_blocks: Default::default(),
287            activation_era,
288            key_block_height_for_activation_point: None,
289            enable_mem_deduplication: config.enable_mem_deduplication,
290            serialized_item_pool: ObjectPool::new(config.mem_pool_prune_interval),
291            recent_era_count,
292            max_ttl,
293            utilization_tracker: BTreeMap::new(),
294            metrics,
295            chain_name_hash: ChainNameDigest::from_chain_name(network_name),
296            transaction_config,
297        };
298
299        if force_resync {
300            let force_resync_file_path = component.root_path().join(FORCE_RESYNC_FILE_NAME);
301            // Check if resync is already in progress. Force resync will kick
302            // in only when the marker file didn't exist before.
303            // Use `OpenOptions::create_new` to atomically check for the file
304            // presence and create it if necessary.
305            match OpenOptions::new()
306                .create_new(true)
307                .write(true)
308                .open(&force_resync_file_path)
309            {
310                Ok(_file) => {
311                    // When the force resync marker file was not present and
312                    // is now created, initialize force resync.
313                    info!("initializing force resync");
314                    // Default `storage.completed_blocks`.
315                    component.completed_blocks = Default::default();
316                    component.persist_completed_blocks()?;
317                    // Exit the initialization function early.
318                    return Ok(component);
319                }
320                Err(io_err) if io_err.kind() == ErrorKind::AlreadyExists => {
321                    info!("skipping force resync as marker file exists");
322                }
323                Err(io_err) => {
324                    warn!(
325                        "couldn't operate on the force resync marker file at path {}: {}",
326                        force_resync_file_path.to_string_lossy(),
327                        io_err
328                    );
329                }
330            }
331        }
332
333        {
334            let ro_txn = component.block_store.checkout_ro()?;
335            let maybe_state_store: Option<Vec<u8>> = ro_txn.read(StateStoreKey::new(
336                Cow::Borrowed(COMPLETED_BLOCKS_STORAGE_KEY),
337            ))?;
338            match maybe_state_store {
339                Some(raw) => {
340                    let (mut sequences, _) = DisjointSequences::from_vec(raw)
341                        .map_err(FatalStorageError::UnexpectedDeserializationFailure)?;
342
343                    // Truncate the sequences in case we removed blocks via a hard reset.
344                    if let Some(header) = DataReader::<Tip, BlockHeader>::read(&ro_txn, Tip)? {
345                        sequences.truncate(header.height());
346                    }
347
348                    component.completed_blocks = sequences;
349                }
350                None => {
351                    // No state so far. We can make the following observations:
352                    //
353                    // 1. Any block already in storage from versions prior to 1.5 (no fast-sync)
354                    // MUST    have the corresponding global state in contract
355                    // runtime due to the way sync    worked previously, so with
356                    // the potential exception of finality signatures, we    can
357                    // consider all these blocks complete. 2. Any block acquired
358                    // from that point onwards was subject to the insertion of the
359                    //    appropriate announcements (`BlockCompletedAnnouncement`), which would have
360                    //    caused the creation of the completed blocks index, thus would not have
361                    //    resulted in a `None` value here.
362                    //
363                    // Note that a previous run of this version which aborted early could have
364                    // stored some blocks and/or block-headers without
365                    // completing the sync process. Hence, when setting the
366                    // `completed_blocks` in this None case, we'll only consider blocks
367                    // from a previous protocol version as complete.
368
369                    let maybe_block_header: Option<BlockHeader> = ro_txn.read(Tip)?;
370                    if let Some(highest_block_header) = maybe_block_header {
371                        for height in (0..=highest_block_header.height()).rev() {
372                            let maybe_header: Option<BlockHeader> = ro_txn.read(height)?;
373                            match maybe_header {
374                                Some(header) if header.protocol_version() < protocol_version => {
375                                    component.completed_blocks =
376                                        DisjointSequences::new(Sequence::new(0, header.height()));
377                                    break;
378                                }
379                                _ => {}
380                            }
381                        }
382                    };
383                }
384            }
385        }
386        component.persist_completed_blocks()?;
387        Ok(component)
388    }
389
390    /// Returns the path to the storage folder.
391    pub(crate) fn root_path(&self) -> &Path {
392        &self.root
393    }
394
395    fn handle_net_request_incoming<REv>(
396        &mut self,
397        effect_builder: EffectBuilder<REv>,
398        incoming: &NetRequestIncoming,
399    ) -> Result<Effects<Event>, GetRequestError>
400    where
401        REv: From<NetworkRequest<Message>> + Send,
402    {
403        if self.enable_mem_deduplication {
404            let unique_id = incoming.message.unique_id();
405
406            if let Some(serialized_item) = self
407                .serialized_item_pool
408                .get(AsRef::<[u8]>::as_ref(&unique_id))
409            {
410                // We found an item in the pool. We can short-circuit all
411                // deserialization/serialization and return the canned item
412                // immediately.
413                let found = Message::new_get_response_from_serialized(
414                    incoming.message.tag(),
415                    serialized_item,
416                );
417                return Ok(effect_builder.send_message(incoming.sender, found).ignore());
418            }
419        }
420
421        match *(incoming.message) {
422            NetRequest::Transaction(ref serialized_id) => {
423                let id = decode_item_id::<Transaction>(serialized_id)?;
424                let opt_item = self.get_transaction_by_id(id)?;
425                let fetch_response = FetchResponse::from_opt(id, opt_item);
426
427                Ok(self.update_pool_and_send(
428                    effect_builder,
429                    incoming.sender,
430                    serialized_id,
431                    fetch_response,
432                )?)
433            }
434            NetRequest::LegacyDeploy(ref serialized_id) => {
435                let id = decode_item_id::<LegacyDeploy>(serialized_id)?;
436                let opt_item = self.get_legacy_deploy(id)?;
437                let fetch_response = FetchResponse::from_opt(id, opt_item);
438
439                Ok(self.update_pool_and_send(
440                    effect_builder,
441                    incoming.sender,
442                    serialized_id,
443                    fetch_response,
444                )?)
445            }
446            NetRequest::Block(ref serialized_id) => {
447                let id = decode_item_id::<Block>(serialized_id)?;
448                let opt_item: Option<Block> = self
449                    .block_store
450                    .checkout_ro()
451                    .map_err(FatalStorageError::from)?
452                    .read(id)
453                    .map_err(FatalStorageError::from)?;
454                let fetch_response = FetchResponse::from_opt(id, opt_item);
455
456                Ok(self.update_pool_and_send(
457                    effect_builder,
458                    incoming.sender,
459                    serialized_id,
460                    fetch_response,
461                )?)
462            }
463            NetRequest::BlockHeader(ref serialized_id) => {
464                let item_id = decode_item_id::<BlockHeader>(serialized_id)?;
465                let opt_item: Option<BlockHeader> = self
466                    .block_store
467                    .checkout_ro()
468                    .map_err(FatalStorageError::from)?
469                    .read(item_id)
470                    .map_err(FatalStorageError::from)?;
471                let fetch_response = FetchResponse::from_opt(item_id, opt_item);
472
473                Ok(self.update_pool_and_send(
474                    effect_builder,
475                    incoming.sender,
476                    serialized_id,
477                    fetch_response,
478                )?)
479            }
480            NetRequest::FinalitySignature(ref serialized_id) => {
481                let id = decode_item_id::<FinalitySignature>(serialized_id)?;
482                let opt_item = self
483                    .block_store
484                    .checkout_ro()
485                    .map_err(FatalStorageError::from)?
486                    .read(*id.block_hash())
487                    .map_err(FatalStorageError::from)?
488                    .and_then(|block_signatures: BlockSignatures| {
489                        block_signatures.finality_signature(id.public_key())
490                    });
491
492                if let Some(item) = opt_item.as_ref() {
493                    if item.block_hash() != id.block_hash() || item.era_id() != id.era_id() {
494                        return Err(GetRequestError::FinalitySignatureIdMismatch {
495                            requested_id: id,
496                            finality_signature: Box::new(item.clone()),
497                        });
498                    }
499                }
500                let fetch_response = FetchResponse::from_opt(id, opt_item);
501
502                Ok(self.update_pool_and_send(
503                    effect_builder,
504                    incoming.sender,
505                    serialized_id,
506                    fetch_response,
507                )?)
508            }
509            NetRequest::SyncLeap(ref serialized_id) => {
510                let item_id = decode_item_id::<SyncLeap>(serialized_id)?;
511                let fetch_response = self.get_sync_leap(item_id)?;
512
513                Ok(self.update_pool_and_send(
514                    effect_builder,
515                    incoming.sender,
516                    serialized_id,
517                    fetch_response,
518                )?)
519            }
520            NetRequest::ApprovalsHashes(ref serialized_id) => {
521                let item_id = decode_item_id::<ApprovalsHashes>(serialized_id)?;
522                let opt_item: Option<ApprovalsHashes> = self
523                    .block_store
524                    .checkout_ro()
525                    .map_err(FatalStorageError::from)?
526                    .read(item_id)
527                    .map_err(FatalStorageError::from)?;
528                let fetch_response = FetchResponse::from_opt(item_id, opt_item);
529
530                Ok(self.update_pool_and_send(
531                    effect_builder,
532                    incoming.sender,
533                    serialized_id,
534                    fetch_response,
535                )?)
536            }
537            NetRequest::BlockExecutionResults(ref serialized_id) => {
538                let item_id = decode_item_id::<BlockExecutionResultsOrChunk>(serialized_id)?;
539                let opt_item = self.read_block_execution_results_or_chunk(&item_id)?;
540                let fetch_response = FetchResponse::from_opt(item_id, opt_item);
541
542                Ok(self.update_pool_and_send(
543                    effect_builder,
544                    incoming.sender,
545                    serialized_id,
546                    fetch_response,
547                )?)
548            }
549        }
550    }
551
552    /// Handles a storage request.
553    fn handle_storage_request(
554        &mut self,
555        req: StorageRequest,
556    ) -> Result<Effects<Event>, FatalStorageError> {
557        // Note: Database IO is handled in a blocking fashion on purpose throughout this function.
558        // The rationale is that long IO operations are very rare and cache misses frequent, so on
559        // average the actual execution time will be very low.
560        Ok(match req {
561            StorageRequest::PutBlock { block, responder } => {
562                let mut rw_txn = self.block_store.checkout_rw()?;
563                let _ = rw_txn.write(&*block)?;
564                rw_txn.commit()?;
565                responder.respond(true).ignore()
566            }
567            StorageRequest::PutApprovalsHashes {
568                approvals_hashes,
569                responder,
570            } => {
571                let mut rw_txn = self.block_store.checkout_rw()?;
572                let _ = rw_txn.write(&*approvals_hashes)?;
573                rw_txn.commit()?;
574                responder.respond(true).ignore()
575            }
576            StorageRequest::GetBlock {
577                block_hash,
578                responder,
579            } => {
580                let maybe_block = self.block_store.checkout_ro()?.read(block_hash)?;
581                responder.respond(maybe_block).ignore()
582            }
583            StorageRequest::IsBlockStored {
584                block_hash,
585                responder,
586            } => {
587                let txn = self.block_store.checkout_ro()?;
588                responder
589                    .respond(DataReader::<BlockHash, Block>::exists(&txn, block_hash)?)
590                    .ignore()
591            }
592            StorageRequest::GetApprovalsHashes {
593                block_hash,
594                responder,
595            } => responder
596                .respond(self.block_store.checkout_ro()?.read(block_hash)?)
597                .ignore(),
598            StorageRequest::GetHighestCompleteBlock { responder } => responder
599                .respond(self.get_highest_complete_block()?)
600                .ignore(),
601            StorageRequest::GetHighestCompleteBlockHeader { responder } => responder
602                .respond(self.get_highest_complete_block_header()?)
603                .ignore(),
604            StorageRequest::GetTransactionsEraIds {
605                transaction_hashes,
606                responder,
607            } => {
608                let mut era_ids = HashSet::new();
609                let txn = self.block_store.checkout_ro()?;
610                for transaction_hash in &transaction_hashes {
611                    let maybe_block_info: Option<BlockHashHeightAndEra> =
612                        txn.read(*transaction_hash)?;
613                    if let Some(block_info) = maybe_block_info {
614                        era_ids.insert(block_info.era_id);
615                    }
616                }
617                responder.respond(era_ids).ignore()
618            }
619            StorageRequest::GetBlockHeader {
620                block_hash,
621                only_from_available_block_range,
622                responder,
623            } => {
624                let txn = self.block_store.checkout_ro()?;
625                responder
626                    .respond(self.get_single_block_header_restricted(
627                        &txn,
628                        &block_hash,
629                        only_from_available_block_range,
630                    )?)
631                    .ignore()
632            }
633            StorageRequest::GetBlockTransfers {
634                block_hash,
635                responder,
636            } => {
637                let maybe_transfers = self.get_transfers(&block_hash)?;
638                responder.respond(maybe_transfers).ignore()
639            }
640            StorageRequest::PutTransaction {
641                transaction,
642                responder,
643            } => {
644                let mut rw_txn = self.block_store.checkout_rw()?;
645                if DataReader::<TransactionHash, Transaction>::exists(&rw_txn, transaction.hash())?
646                {
647                    responder.respond(false).ignore()
648                } else {
649                    let _ = rw_txn.write(&*transaction)?;
650                    rw_txn.commit()?;
651                    responder.respond(true).ignore()
652                }
653            }
654            StorageRequest::GetTransactions {
655                transaction_hashes,
656                responder,
657            } => responder
658                .respond(self.get_transactions_with_finalized_approvals(transaction_hashes.iter())?)
659                .ignore(),
660            StorageRequest::GetLegacyDeploy {
661                deploy_hash,
662                responder,
663            } => {
664                let maybe_legacy_deploy = self.get_legacy_deploy(deploy_hash)?;
665                responder.respond(maybe_legacy_deploy).ignore()
666            }
667            StorageRequest::GetTransaction {
668                transaction_id,
669                responder,
670            } => {
671                let ro_txn = self.block_store.checkout_ro()?;
672                let maybe_transaction = match Self::get_transaction_with_finalized_approvals(
673                    &ro_txn,
674                    &transaction_id.transaction_hash(),
675                )? {
676                    None => None,
677                    Some((transaction, maybe_approvals)) => {
678                        let transaction = if let Some(approvals) = maybe_approvals {
679                            transaction.with_approvals(approvals)
680                        } else {
681                            transaction
682                        };
683                        (transaction.fetch_id() == transaction_id).then_some(transaction)
684                    }
685                };
686                responder.respond(maybe_transaction).ignore()
687            }
688            StorageRequest::GetTransactionAndExecutionInfo {
689                transaction_hash,
690                with_finalized_approvals,
691                responder,
692            } => {
693                let ro_txn = self.block_store.checkout_ro()?;
694
695                let transaction = if with_finalized_approvals {
696                    match Self::get_transaction_with_finalized_approvals(
697                        &ro_txn,
698                        &transaction_hash,
699                    )? {
700                        Some((transaction, maybe_approvals)) => {
701                            if let Some(approvals) = maybe_approvals {
702                                transaction.with_approvals(approvals)
703                            } else {
704                                transaction
705                            }
706                        }
707                        None => return Ok(responder.respond(None).ignore()),
708                    }
709                } else {
710                    match ro_txn.read(transaction_hash)? {
711                        Some(transaction) => transaction,
712                        None => return Ok(responder.respond(None).ignore()),
713                    }
714                };
715
716                let block_hash_height_and_era: BlockHashHeightAndEra =
717                    match ro_txn.read(transaction_hash)? {
718                        Some(value) => value,
719                        None => return Ok(responder.respond(Some((transaction, None))).ignore()),
720                    };
721
722                let execution_result = ro_txn.read(transaction_hash)?;
723                let execution_info = ExecutionInfo {
724                    block_hash: block_hash_height_and_era.block_hash,
725                    block_height: block_hash_height_and_era.block_height,
726                    execution_result,
727                };
728
729                responder
730                    .respond(Some((transaction, Some(execution_info))))
731                    .ignore()
732            }
733            StorageRequest::IsTransactionStored {
734                transaction_id,
735                responder,
736            } => {
737                let txn = self.block_store.checkout_ro()?;
738                let has_transaction = DataReader::<TransactionHash, Transaction>::exists(
739                    &txn,
740                    transaction_id.transaction_hash(),
741                )?;
742                responder.respond(has_transaction).ignore()
743            }
744            StorageRequest::GetExecutionResults {
745                block_hash,
746                responder,
747            } => {
748                let txn = self.block_store.checkout_ro()?;
749                responder
750                    .respond(Self::get_execution_results_with_transaction_headers(
751                        &txn,
752                        &block_hash,
753                    )?)
754                    .ignore()
755            }
756            StorageRequest::GetBlockExecutionResultsOrChunk { id, responder } => responder
757                .respond(self.read_block_execution_results_or_chunk(&id)?)
758                .ignore(),
759            StorageRequest::PutExecutionResults {
760                block_hash,
761                block_height,
762                era_id,
763                execution_results,
764                responder,
765            } => {
766                let mut rw_txn = self.block_store.checkout_rw()?;
767                let _ = rw_txn.write(&BlockExecutionResults {
768                    block_info: BlockHashHeightAndEra::new(*block_hash, block_height, era_id),
769                    exec_results: execution_results,
770                })?;
771                rw_txn.commit()?;
772                responder.respond(()).ignore()
773            }
774            StorageRequest::GetFinalitySignature { id, responder } => {
775                let maybe_sig = self
776                    .block_store
777                    .checkout_ro()?
778                    .read(*id.block_hash())?
779                    .and_then(|sigs: BlockSignatures| sigs.finality_signature(id.public_key()))
780                    .filter(|sig| sig.era_id() == id.era_id());
781                responder.respond(maybe_sig).ignore()
782            }
783            StorageRequest::IsFinalitySignatureStored { id, responder } => {
784                let has_signature = self
785                    .block_store
786                    .checkout_ro()?
787                    .read(*id.block_hash())?
788                    .map(|sigs: BlockSignatures| sigs.has_finality_signature(id.public_key()))
789                    .unwrap_or(false);
790                responder.respond(has_signature).ignore()
791            }
792            StorageRequest::GetBlockAndMetadataByHeight {
793                block_height,
794                only_from_available_block_range,
795                responder,
796            } => {
797                if !(self.should_return_block(block_height, only_from_available_block_range)) {
798                    return Ok(responder.respond(None).ignore());
799                }
800
801                let ro_txn = self.block_store.checkout_ro()?;
802
803                let block: Block = {
804                    if let Some(block) = ro_txn.read(block_height)? {
805                        block
806                    } else {
807                        return Ok(responder.respond(None).ignore());
808                    }
809                };
810
811                let hash = block.hash();
812                let block_signatures = match ro_txn.read(*hash)? {
813                    Some(signatures) => signatures,
814                    None => self.get_default_block_signatures(&block),
815                };
816                responder
817                    .respond(Some(BlockWithMetadata {
818                        block,
819                        block_signatures,
820                    }))
821                    .ignore()
822            }
823            StorageRequest::PutBlockSignatures {
824                signatures,
825                responder,
826            } => {
827                if signatures.is_empty() {
828                    error!(
829                        ?signatures,
830                        "should not attempt to store empty collection of block signatures"
831                    );
832                    return Ok(responder.respond(false).ignore());
833                }
834                let mut txn = self.block_store.checkout_rw()?;
835                let old_data: Option<BlockSignatures> = txn.read(*signatures.block_hash())?;
836                let new_data = match old_data {
837                    None => signatures,
838                    Some(mut data) => {
839                        if let Err(error) = data.merge(signatures) {
840                            error!(%error, "failed to put block signatures");
841                            return Ok(responder.respond(false).ignore());
842                        }
843                        data
844                    }
845                };
846                let _ = txn.write(&new_data)?;
847                txn.commit()?;
848                responder.respond(true).ignore()
849            }
850            StorageRequest::PutFinalitySignature {
851                signature,
852                responder,
853            } => {
854                let mut rw_txn = self.block_store.checkout_rw()?;
855                let block_hash = signature.block_hash();
856                let mut block_signatures: BlockSignatures =
857                    if let Some(existing_signatures) = rw_txn.read(*block_hash)? {
858                        existing_signatures
859                    } else {
860                        match &*signature {
861                            FinalitySignature::V1(signature) => {
862                                BlockSignaturesV1::new(*signature.block_hash(), signature.era_id())
863                                    .into()
864                            }
865                            FinalitySignature::V2(signature) => BlockSignaturesV2::new(
866                                *signature.block_hash(),
867                                signature.block_height(),
868                                signature.era_id(),
869                                signature.chain_name_hash(),
870                            )
871                            .into(),
872                        }
873                    };
874                match (&mut block_signatures, *signature) {
875                    (
876                        BlockSignatures::V1(ref mut block_signatures),
877                        FinalitySignature::V1(signature),
878                    ) => {
879                        block_signatures.insert_signature(
880                            signature.public_key().clone(),
881                            *signature.signature(),
882                        );
883                    }
884                    (
885                        BlockSignatures::V2(ref mut block_signatures),
886                        FinalitySignature::V2(signature),
887                    ) => {
888                        block_signatures.insert_signature(
889                            signature.public_key().clone(),
890                            *signature.signature(),
891                        );
892                    }
893                    (block_signatures, signature) => {
894                        let mismatch =
895                            VariantMismatch(Box::new((block_signatures.clone(), signature)));
896                        return Err(FatalStorageError::from(mismatch));
897                    }
898                }
899
900                let _ = rw_txn.write(&block_signatures);
901                rw_txn.commit()?;
902                responder.respond(true).ignore()
903            }
904            StorageRequest::GetBlockSignature {
905                block_hash,
906                public_key,
907                responder,
908            } => {
909                let maybe_signatures: Option<BlockSignatures> =
910                    self.block_store.checkout_ro()?.read(block_hash)?;
911                responder
912                    .respond(
913                        maybe_signatures
914                            .and_then(|signatures| signatures.finality_signature(&public_key)),
915                    )
916                    .ignore()
917            }
918            StorageRequest::GetBlockHeaderByHeight {
919                block_height,
920                only_from_available_block_range,
921                responder,
922            } => {
923                let maybe_header = self
924                    .read_block_header_by_height(block_height, only_from_available_block_range)?;
925                responder.respond(maybe_header).ignore()
926            }
927            StorageRequest::GetLatestSwitchBlockHeader { responder } => {
928                let txn = self.block_store.checkout_ro()?;
929                let maybe_header = txn.read(LatestSwitchBlock)?;
930                responder.respond(maybe_header).ignore()
931            }
932            StorageRequest::GetSwitchBlockHeaderByEra { era_id, responder } => {
933                let txn = self.block_store.checkout_ro()?;
934                let maybe_header = txn.read(era_id)?;
935                responder.respond(maybe_header).ignore()
936            }
937            StorageRequest::PutBlockHeader {
938                block_header,
939                responder,
940            } => {
941                let mut rw_txn = self.block_store.checkout_rw()?;
942                let _ = rw_txn.write(&*block_header)?;
943                rw_txn.commit()?;
944                responder.respond(true).ignore()
945            }
946            StorageRequest::GetAvailableBlockRange { responder } => {
947                responder.respond(self.get_available_block_range()).ignore()
948            }
949            StorageRequest::StoreFinalizedApprovals {
950                ref transaction_hash,
951                ref finalized_approvals,
952                responder,
953            } => {
954                info!(txt=?transaction_hash, count=finalized_approvals.len(), "storing finalized approvals {:?}", finalized_approvals);
955                responder
956                    .respond(self.store_finalized_approvals(transaction_hash, finalized_approvals)?)
957                    .ignore()
958            }
959            StorageRequest::PutExecutedBlock {
960                block,
961                approvals_hashes,
962                execution_results,
963                responder,
964            } => {
965                let block: Block = (*block).clone().into();
966                let transaction_config = self.transaction_config.clone();
967                responder
968                    .respond(self.put_executed_block(
969                        transaction_config,
970                        &block,
971                        &approvals_hashes,
972                        execution_results,
973                    )?)
974                    .ignore()
975            }
976            StorageRequest::GetKeyBlockHeightForActivationPoint { responder } => {
977                // If we haven't already cached the height, try to retrieve the key block header.
978                if self.key_block_height_for_activation_point.is_none() {
979                    let key_block_era = self.activation_era.predecessor().unwrap_or_default();
980                    let txn = self.block_store.checkout_ro()?;
981                    let key_block_header: BlockHeader = match txn.read(key_block_era)? {
982                        Some(block_header) => block_header,
983                        None => return Ok(responder.respond(None).ignore()),
984                    };
985                    self.key_block_height_for_activation_point = Some(key_block_header.height());
986                }
987                responder
988                    .respond(self.key_block_height_for_activation_point)
989                    .ignore()
990            }
991            StorageRequest::GetRawData {
992                key,
993                responder,
994                record_id,
995            } => {
996                let db_table_id = utils::db_table_id_from_record_id(record_id)
997                    .map_err(|_| FatalStorageError::UnexpectedRecordId(record_id))?;
998                let txn = self.block_store.checkout_ro()?;
999                let maybe_data: Option<DbRawBytesSpec> = txn.read((db_table_id, key))?;
1000                match maybe_data {
1001                    None => responder.respond(None).ignore(),
1002                    Some(db_raw) => responder.respond(Some(db_raw)).ignore(),
1003                }
1004            }
1005            StorageRequest::GetBlockUtilizationScore {
1006                era_id,
1007                block_height,
1008                switch_block_utilization,
1009                responder,
1010            } => {
1011                let utilization = self.get_block_utilization_score(
1012                    era_id,
1013                    block_height,
1014                    switch_block_utilization,
1015                );
1016
1017                responder.respond(utilization).ignore()
1018            }
1019        })
1020    }
1021
1022    pub(crate) fn read_block_header_by_height(
1023        &self,
1024        block_height: u64,
1025        only_from_available_block_range: bool,
1026    ) -> Result<Option<BlockHeader>, FatalStorageError> {
1027        if !(self.should_return_block(block_height, only_from_available_block_range)) {
1028            Ok(None)
1029        } else {
1030            let txn = self.block_store.checkout_ro()?;
1031            txn.read(block_height).map_err(FatalStorageError::from)
1032        }
1033    }
1034
1035    pub(crate) fn get_switch_block_by_era_id(
1036        &self,
1037        era_id: &EraId,
1038    ) -> Result<Option<Block>, FatalStorageError> {
1039        let txn = self.block_store.checkout_ro()?;
1040        txn.read(*era_id).map_err(FatalStorageError::from)
1041    }
1042
1043    /// Retrieves a set of transactions, along with their potential finalized approvals.
1044    #[allow(clippy::type_complexity)]
1045    fn get_transactions_with_finalized_approvals<'a>(
1046        &self,
1047        transaction_hashes: impl Iterator<Item = &'a TransactionHash>,
1048    ) -> Result<SmallVec<[Option<(Transaction, Option<BTreeSet<Approval>>)>; 1]>, FatalStorageError>
1049    {
1050        let ro_txn = self.block_store.checkout_ro()?;
1051
1052        transaction_hashes
1053            .map(|transaction_hash| {
1054                Self::get_transaction_with_finalized_approvals(&ro_txn, transaction_hash)
1055            })
1056            .collect()
1057    }
1058
1059    pub(crate) fn put_executed_block(
1060        &mut self,
1061        transaction_config: TransactionConfig,
1062        block: &Block,
1063        approvals_hashes: &ApprovalsHashes,
1064        execution_results: HashMap<TransactionHash, ExecutionResult>,
1065    ) -> Result<bool, FatalStorageError> {
1066        let mut txn = self.block_store.checkout_rw()?;
1067        let era_id = block.era_id();
1068        let block_utilization_score = block.block_utilization(transaction_config.clone());
1069        let has_hit_slot_limit = block.has_hit_slot_capacity(transaction_config.clone());
1070        let block_hash = txn.write(block)?;
1071        let _ = txn.write(approvals_hashes)?;
1072        let block_info = BlockHashHeightAndEra::new(block_hash, block.height(), block.era_id());
1073
1074        let utilization = if has_hit_slot_limit {
1075            debug!("Block is at slot capacity, using slot utilization score");
1076            block_utilization_score
1077        } else if execution_results.is_empty() {
1078            0u64
1079        } else {
1080            let total_gas_utilization = {
1081                let total_gas_limit: U512 = execution_results
1082                    .values()
1083                    .map(|results| match results {
1084                        ExecutionResult::V1(v1_result) => match v1_result {
1085                            ExecutionResultV1::Failure { cost, .. } => *cost,
1086                            ExecutionResultV1::Success { cost, .. } => *cost,
1087                        },
1088                        ExecutionResult::V2(v2_result) => v2_result.limit.value(),
1089                    })
1090                    .sum();
1091
1092                let consumed: u64 = total_gas_limit.as_u64();
1093                let block_gas_limit = transaction_config.block_gas_limit;
1094
1095                Ratio::new(consumed * 100u64, block_gas_limit).to_integer()
1096            };
1097            debug!("Gas utilization at {total_gas_utilization}");
1098
1099            let total_size_utilization = {
1100                let size_used: u64 = execution_results
1101                    .values()
1102                    .map(|results| {
1103                        if let ExecutionResult::V2(result) = results {
1104                            result.size_estimate
1105                        } else {
1106                            0u64
1107                        }
1108                    })
1109                    .sum();
1110
1111                let block_size_limit = transaction_config.max_block_size as u64;
1112                Ratio::new(size_used * 100, block_size_limit).to_integer()
1113            };
1114
1115            debug!("Storage utilization at {total_size_utilization}");
1116
1117            let scores = [
1118                block_utilization_score,
1119                total_size_utilization,
1120                total_gas_utilization,
1121            ];
1122
1123            match scores.iter().max() {
1124                Some(max_utlization) => *max_utlization,
1125                None => {
1126                    // This should never happen as we just created the scores vector to find the
1127                    // max value
1128                    warn!("Unable to determine max utilization, marking 0 utilization");
1129                    0u64
1130                }
1131            }
1132        };
1133
1134        debug!("Utilization for block is {utilization}");
1135
1136        let _ = txn.write(&BlockExecutionResults {
1137            block_info,
1138            exec_results: execution_results,
1139        })?;
1140        txn.commit()?;
1141
1142        match self.utilization_tracker.get_mut(&era_id) {
1143            Some(block_score) => {
1144                block_score.insert(block.height(), utilization);
1145            }
1146            None => {
1147                let mut block_score = BTreeMap::new();
1148                block_score.insert(block.height(), utilization);
1149                self.utilization_tracker.insert(era_id, block_score);
1150            }
1151        }
1152
1153        Ok(true)
1154    }
1155
1156    /// Handles a [`BlockCompletedAnnouncement`].
1157    fn handle_mark_block_completed_request(
1158        &mut self,
1159        MarkBlockCompletedRequest {
1160            block_height,
1161            responder,
1162        }: MarkBlockCompletedRequest,
1163    ) -> Result<Effects<Event>, FatalStorageError> {
1164        let is_new = self.mark_block_complete(block_height)?;
1165        Ok(responder.respond(is_new).ignore())
1166    }
1167
1168    /// Marks the block at height `block_height` as complete by inserting it
1169    /// into the `completed_blocks` index and storing it to disk.
1170    fn mark_block_complete(&mut self, block_height: u64) -> Result<bool, FatalStorageError> {
1171        let is_new = self.completed_blocks.insert(block_height);
1172        if is_new {
1173            self.persist_completed_blocks()?;
1174            info!(
1175                "Storage: marked block {} complete: {}",
1176                block_height,
1177                self.get_available_block_range()
1178            );
1179            self.update_chain_height_metrics();
1180        } else {
1181            debug!(
1182                "Storage: tried to mark already-complete block {} complete",
1183                block_height
1184            );
1185        }
1186        Ok(is_new)
1187    }
1188
1189    /// Persists the completed blocks disjoint sequences state to the database.
1190    fn persist_completed_blocks(&mut self) -> Result<(), FatalStorageError> {
1191        let serialized = self
1192            .completed_blocks
1193            .to_bytes()
1194            .map_err(FatalStorageError::UnexpectedSerializationFailure)?;
1195        let mut rw_txn = self.block_store.checkout_rw()?;
1196        rw_txn.write(&StateStore {
1197            key: Cow::Borrowed(COMPLETED_BLOCKS_STORAGE_KEY),
1198            value: serialized,
1199        })?;
1200        rw_txn.commit().map_err(FatalStorageError::from)
1201    }
1202
1203    /// Retrieves the height of the highest complete block (if any).
1204    pub(crate) fn highest_complete_block_height(&self) -> Option<u64> {
1205        self.completed_blocks.highest_sequence().map(Sequence::high)
1206    }
1207
1208    /// Retrieves the contiguous segment of the block chain starting at the highest known switch
1209    /// block such that the blocks' timestamps cover a duration of at least the max TTL for deploys
1210    /// (a chainspec setting).
1211    ///
1212    /// If storage doesn't hold enough blocks to cover the specified duration, it will still return
1213    /// the highest contiguous segment starting at the highest switch block which it does hold.
1214    pub(crate) fn read_blocks_for_replay_protection(
1215        &self,
1216    ) -> Result<Vec<Block>, FatalStorageError> {
1217        let ro_txn = self.block_store.checkout_ro()?;
1218
1219        let timestamp =
1220            match DataReader::<LatestSwitchBlock, BlockHeader>::read(&ro_txn, LatestSwitchBlock)? {
1221                Some(last_era_header) => last_era_header
1222                    .timestamp()
1223                    .saturating_sub(self.max_ttl.value()),
1224                None => Timestamp::now(),
1225            };
1226
1227        let mut blocks = Vec::new();
1228        for sequence in self.completed_blocks.sequences().iter().rev() {
1229            let hi = sequence.high();
1230            let low = sequence.low();
1231            for idx in (low..=hi).rev() {
1232                let maybe_block: Result<Option<Block>, BlockStoreError> = ro_txn.read(idx);
1233                match maybe_block {
1234                    Ok(Some(block)) => {
1235                        let should_continue = block.timestamp() >= timestamp;
1236                        blocks.push(block);
1237                        if false == should_continue {
1238                            return Ok(blocks);
1239                        }
1240                    }
1241                    Ok(None) => {
1242                        continue;
1243                    }
1244                    Err(err) => return Err(FatalStorageError::BlockStoreError(err)),
1245                }
1246            }
1247        }
1248        Ok(blocks)
1249    }
1250
1251    /// Returns an executable block.
1252    pub(crate) fn make_executable_block(
1253        &self,
1254        block_hash: &BlockHash,
1255    ) -> Result<Option<ExecutableBlock>, FatalStorageError> {
1256        let (block, transactions) =
1257            match self.read_block_and_finalized_transactions_by_hash(*block_hash)? {
1258                Some(block_and_finalized_transactions) => block_and_finalized_transactions,
1259                None => {
1260                    error!(
1261                        ?block_hash,
1262                        "Storage: unable to make_executable_block for  {}", block_hash
1263                    );
1264                    return Ok(None);
1265                }
1266            };
1267        let maybe_finalized_approvals: Option<ApprovalsHashes> =
1268            self.block_store.checkout_ro()?.read(*block.hash())?;
1269        if let Some(finalized_approvals) = maybe_finalized_approvals {
1270            if transactions.len() != finalized_approvals.approvals_hashes().len() {
1271                error!(
1272                    ?block_hash,
1273                    "Storage: transaction hashes length mismatch {}", block_hash
1274                );
1275                return Err(FatalStorageError::ApprovalsHashesLengthMismatch {
1276                    block_hash: *block_hash,
1277                    expected: transactions.len(),
1278                    actual: finalized_approvals.approvals_hashes().len(),
1279                });
1280            }
1281            for (transaction, hash) in transactions
1282                .iter()
1283                .zip(finalized_approvals.approvals_hashes())
1284            {
1285                let computed_hash = transaction.compute_approvals_hash().map_err(|error| {
1286                    error!(%error, "failed to serialize approvals");
1287                    FatalStorageError::UnexpectedSerializationFailure(error)
1288                })?;
1289                if computed_hash == hash {
1290                    continue;
1291                }
1292                // This should be unreachable as the `BlockSynchronizer` should ensure we have the
1293                // correct approvals before it then calls this method.  By returning `Ok(None)` the
1294                // node would be stalled at this block, but should eventually sync leap due to lack
1295                // of progress.  It would then backfill this block without executing it.
1296                error!(?block_hash, "Storage: transaction with incorrect approvals");
1297                return Ok(None);
1298            }
1299        }
1300
1301        let executable_block = ExecutableBlock::from_block_and_transactions(block, transactions);
1302        info!(%block_hash, "Storage: created {}", executable_block);
1303        Ok(Some(executable_block))
1304    }
1305
1306    /// Retrieves single block and all of its deploys, with the finalized approvals.
1307    /// If any of the deploys can't be found, returns `Ok(None)`.
1308    fn read_block_and_finalized_transactions_by_hash(
1309        &self,
1310        block_hash: BlockHash,
1311    ) -> Result<Option<(BlockV2, Vec<Transaction>)>, FatalStorageError> {
1312        let txn = self.block_store.checkout_ro()?;
1313
1314        let Some(block) = txn.read(block_hash)? else {
1315            debug!(
1316                ?block_hash,
1317                "Storage: read_block_and_finalized_transactions_by_hash failed to get block for {}",
1318                block_hash
1319            );
1320            return Ok(None);
1321        };
1322
1323        let Block::V2(block) = block else {
1324            debug!(
1325                ?block_hash,
1326                "Storage: read_block_and_finalized_transactions_by_hash expected block V2 {}",
1327                block_hash
1328            );
1329            return Ok(None);
1330        };
1331
1332        let mut transactions = vec![];
1333        for (transaction, _) in (self
1334            .get_transactions_with_finalized_approvals(block.all_transactions())?)
1335        .into_iter()
1336        .flatten()
1337        {
1338            transactions.push(transaction);
1339        }
1340
1341        Ok(Some((block, transactions)))
1342    }
1343
1344    /// Retrieves the highest complete block header from storage, if one exists. May return an
1345    /// LMDB error.
1346    fn get_highest_complete_block_header(&self) -> Result<Option<BlockHeader>, FatalStorageError> {
1347        let highest_complete_block_height = match self.completed_blocks.highest_sequence() {
1348            Some(sequence) => sequence.high(),
1349            None => {
1350                return Ok(None);
1351            }
1352        };
1353
1354        let txn = self.block_store.checkout_ro()?;
1355        txn.read(highest_complete_block_height)
1356            .map_err(FatalStorageError::from)
1357    }
1358
1359    /// Retrieves the highest block header with metadata from storage, if one exists. May return an
1360    /// LMDB error.
1361    fn get_highest_complete_block_header_with_signatures(
1362        &self,
1363        txn: &(impl DataReader<BlockHeight, BlockHeader> + DataReader<BlockHash, BlockSignatures>),
1364    ) -> Result<Option<BlockHeaderWithSignatures>, FatalStorageError> {
1365        let highest_complete_block_height = match self.completed_blocks.highest_sequence() {
1366            Some(sequence) => sequence.high(),
1367            None => {
1368                return Ok(None);
1369            }
1370        };
1371
1372        let block_header: Option<BlockHeader> = txn.read(highest_complete_block_height)?;
1373        match block_header {
1374            Some(header) => {
1375                let block_header_hash = header.block_hash();
1376                let block_signatures: BlockSignatures = match txn.read(block_header_hash)? {
1377                    Some(signatures) => signatures,
1378                    None => match &header {
1379                        BlockHeader::V1(header) => BlockSignatures::V1(BlockSignaturesV1::new(
1380                            header.block_hash(),
1381                            header.era_id(),
1382                        )),
1383                        BlockHeader::V2(header) => BlockSignatures::V2(BlockSignaturesV2::new(
1384                            header.block_hash(),
1385                            header.height(),
1386                            header.era_id(),
1387                            self.chain_name_hash,
1388                        )),
1389                    },
1390                };
1391                Ok(Some(BlockHeaderWithSignatures::new(
1392                    header,
1393                    block_signatures,
1394                )))
1395            }
1396            None => Ok(None),
1397        }
1398    }
1399
1400    /// Retrieves the highest complete block from storage, if one exists. May return an LMDB error.
1401    pub fn get_highest_complete_block(&self) -> Result<Option<Block>, FatalStorageError> {
1402        let highest_complete_block_height = match self.highest_complete_block_height() {
1403            Some(height) => height,
1404            None => {
1405                return Ok(None);
1406            }
1407        };
1408
1409        let txn = self.block_store.checkout_ro()?;
1410        txn.read(highest_complete_block_height)
1411            .map_err(FatalStorageError::from)
1412    }
1413
1414    /// Retrieves a single block header in a given transaction from storage
1415    /// respecting the possible restriction on whether the block
1416    /// should be present in the available blocks index.
1417    fn get_single_block_header_restricted(
1418        &self,
1419        txn: &impl DataReader<BlockHash, BlockHeader>,
1420        block_hash: &BlockHash,
1421        only_from_available_block_range: bool,
1422    ) -> Result<Option<BlockHeader>, FatalStorageError> {
1423        let block_header = match txn.read(*block_hash)? {
1424            Some(header) => header,
1425            None => return Ok(None),
1426        };
1427
1428        if !(self.should_return_block(block_header.height(), only_from_available_block_range)) {
1429            return Ok(None);
1430        }
1431
1432        Ok(Some(block_header))
1433    }
1434
1435    /// Returns headers of complete blocks of the trusted block's ancestors, back to the most
1436    /// recent switch block.
1437    fn get_trusted_ancestor_headers(
1438        &self,
1439        txn: &impl DataReader<BlockHash, BlockHeader>,
1440        trusted_block_header: &BlockHeader,
1441    ) -> Result<Option<Vec<BlockHeader>>, FatalStorageError> {
1442        if trusted_block_header.is_genesis() {
1443            return Ok(Some(vec![]));
1444        }
1445        let available_block_range = self.get_available_block_range();
1446        let mut result = vec![];
1447        let mut current_trusted_block_header = trusted_block_header.clone();
1448        loop {
1449            let parent_hash = current_trusted_block_header.parent_hash();
1450            let parent_block_header: BlockHeader = match txn.read(*parent_hash)? {
1451                Some(block_header) => block_header,
1452                None => {
1453                    warn!(%parent_hash, "block header not found");
1454                    return Ok(None);
1455                }
1456            };
1457
1458            if !available_block_range.contains(parent_block_header.height()) {
1459                debug!(%parent_hash, "block header not complete");
1460                return Ok(None);
1461            }
1462
1463            result.push(parent_block_header.clone());
1464            if parent_block_header.is_switch_block() || parent_block_header.is_genesis() {
1465                break;
1466            }
1467            current_trusted_block_header = parent_block_header;
1468        }
1469        Ok(Some(result))
1470    }
1471
1472    /// Returns headers of all known switch blocks after the trusted block but before
1473    /// highest block, with signatures, plus the signed highest block.
1474    fn get_block_headers_with_signatures(
1475        &self,
1476        txn: &(impl DataReader<BlockHash, BlockSignatures> + DataReader<EraId, BlockHeader>),
1477        trusted_block_header: &BlockHeader,
1478        highest_block_header_with_signatures: &BlockHeaderWithSignatures,
1479    ) -> Result<Option<Vec<BlockHeaderWithSignatures>>, FatalStorageError> {
1480        if trusted_block_header.block_hash()
1481            == highest_block_header_with_signatures
1482                .block_header()
1483                .block_hash()
1484        {
1485            return Ok(Some(vec![]));
1486        }
1487
1488        let start_era_id: u64 = trusted_block_header.next_block_era_id().into();
1489        let current_era_id: u64 = highest_block_header_with_signatures
1490            .block_header()
1491            .era_id()
1492            .into();
1493
1494        let mut result = vec![];
1495
1496        for era_id in start_era_id..current_era_id {
1497            let maybe_block_header: Option<BlockHeader> = txn.read(EraId::from(era_id))?;
1498            match maybe_block_header {
1499                Some(block_header) => {
1500                    let block_signatures = match txn.read(block_header.block_hash())? {
1501                        Some(signatures) => signatures,
1502                        None => match &block_header {
1503                            BlockHeader::V1(header) => BlockSignatures::V1(BlockSignaturesV1::new(
1504                                header.block_hash(),
1505                                header.era_id(),
1506                            )),
1507                            BlockHeader::V2(header) => BlockSignatures::V2(BlockSignaturesV2::new(
1508                                header.block_hash(),
1509                                header.height(),
1510                                header.era_id(),
1511                                self.chain_name_hash,
1512                            )),
1513                        },
1514                    };
1515                    result.push(BlockHeaderWithSignatures::new(
1516                        block_header,
1517                        block_signatures,
1518                    ));
1519                }
1520                None => return Ok(None),
1521            }
1522        }
1523        result.push(highest_block_header_with_signatures.clone());
1524
1525        Ok(Some(result))
1526    }
1527
1528    /// Stores a set of finalized approvals if they are different to the approvals in the original
1529    /// transaction and if they are different to existing finalized approvals if any.
1530    ///
1531    /// Returns `true` if the provided approvals were stored.
1532    fn store_finalized_approvals(
1533        &mut self,
1534        transaction_hash: &TransactionHash,
1535        finalized_approvals: &BTreeSet<Approval>,
1536    ) -> Result<bool, FatalStorageError> {
1537        let mut txn = self.block_store.checkout_rw()?;
1538        let original_transaction: Transaction = txn.read(*transaction_hash)?.ok_or({
1539            FatalStorageError::UnexpectedFinalizedApprovals {
1540                transaction_hash: *transaction_hash,
1541            }
1542        })?;
1543
1544        // Only store the finalized approvals if they are different from the original ones.
1545        let maybe_existing_finalized_approvals: Option<BTreeSet<Approval>> =
1546            txn.read(*transaction_hash)?;
1547        if maybe_existing_finalized_approvals.as_ref() == Some(finalized_approvals) {
1548            return Ok(false);
1549        }
1550
1551        let original_approvals = original_transaction.approvals();
1552        if &original_approvals != finalized_approvals {
1553            let _ = txn.write(&TransactionFinalizedApprovals {
1554                transaction_hash: *transaction_hash,
1555                finalized_approvals: finalized_approvals.clone(),
1556            })?;
1557            txn.commit()?;
1558            return Ok(true);
1559        }
1560
1561        Ok(false)
1562    }
1563
1564    /// Retrieves successful transfers associated with block.
1565    ///
1566    /// If there is no record of successful transfers for this block, then the list will be built
1567    /// from the execution results and stored to `transfer_db`.  The record could have been missing
1568    /// or incorrectly set to an empty collection due to previous synchronization and storage
1569    /// issues.  See https://github.com/casper-network/casper-node/issues/4255 and
1570    /// https://github.com/casper-network/casper-node/issues/4268 for further info.
1571    fn get_transfers(
1572        &mut self,
1573        block_hash: &BlockHash,
1574    ) -> Result<Option<Vec<Transfer>>, FatalStorageError> {
1575        let mut rw_txn = self.block_store.checkout_rw()?;
1576        let maybe_transfers: Option<Vec<Transfer>> = rw_txn.read(*block_hash)?;
1577        if let Some(transfers) = maybe_transfers {
1578            if !transfers.is_empty() {
1579                return Ok(Some(transfers));
1580            }
1581        }
1582
1583        let block: Block = match rw_txn.read(*block_hash)? {
1584            Some(block) => block,
1585            None => return Ok(None),
1586        };
1587
1588        let deploy_hashes: Vec<DeployHash> = match block.clone_body() {
1589            BlockBody::V1(v1) => v1.deploy_and_transfer_hashes().copied().collect(),
1590            BlockBody::V2(v2) => v2
1591                .all_transactions()
1592                .filter_map(|transaction_hash| match transaction_hash {
1593                    TransactionHash::Deploy(deploy_hash) => Some(*deploy_hash),
1594                    TransactionHash::V1(_) => None,
1595                })
1596                .collect(),
1597        };
1598
1599        let mut transfers: Vec<Transfer> = vec![];
1600        for deploy_hash in deploy_hashes {
1601            let transaction_hash = TransactionHash::Deploy(deploy_hash);
1602            let successful_xfers = match rw_txn.read(transaction_hash)? {
1603                Some(exec_result) => successful_transfers(&exec_result),
1604                None => {
1605                    error!(%deploy_hash, %block_hash, "should have exec result");
1606                    vec![]
1607                }
1608            };
1609            transfers.extend(successful_xfers);
1610        }
1611        rw_txn.write(&BlockTransfers {
1612            block_hash: *block_hash,
1613            transfers: transfers.clone(),
1614        })?;
1615        rw_txn.commit()?;
1616        Ok(Some(transfers))
1617    }
1618
1619    /// Retrieves a deploy from the deploy store by deploy hash.
1620    fn get_legacy_deploy(
1621        &self,
1622        deploy_hash: DeployHash,
1623    ) -> Result<Option<LegacyDeploy>, FatalStorageError> {
1624        let transaction_hash = TransactionHash::from(deploy_hash);
1625        let txn = self.block_store.checkout_ro()?;
1626        let transaction =
1627            match Self::get_transaction_with_finalized_approvals(&txn, &transaction_hash)? {
1628                Some((transaction, maybe_approvals)) => {
1629                    if let Some(approvals) = maybe_approvals {
1630                        transaction.with_approvals(approvals)
1631                    } else {
1632                        transaction
1633                    }
1634                }
1635                None => return Ok(None),
1636            };
1637
1638        match transaction {
1639            Transaction::Deploy(deploy) => Ok(Some(LegacyDeploy::from(deploy))),
1640            transaction @ Transaction::V1(_) => {
1641                let mismatch = VariantMismatch(Box::new((transaction_hash, transaction)));
1642                error!(%mismatch, "failed getting legacy deploy");
1643                Err(FatalStorageError::from(mismatch))
1644            }
1645        }
1646    }
1647
1648    /// Retrieves a transaction by transaction ID.
1649    fn get_transaction_by_id(
1650        &self,
1651        transaction_id: TransactionId,
1652    ) -> Result<Option<Transaction>, FatalStorageError> {
1653        let transaction_hash = transaction_id.transaction_hash();
1654        let txn = self.block_store.checkout_ro()?;
1655
1656        let maybe_transaction: Option<Transaction> = txn.read(transaction_hash)?;
1657        let transaction: Transaction = match maybe_transaction {
1658            None => return Ok(None),
1659            Some(transaction) if transaction.fetch_id() == transaction_id => {
1660                return Ok(Some(transaction));
1661            }
1662            Some(transaction) => transaction,
1663        };
1664
1665        let finalized_approvals = match txn.read(transaction_hash)? {
1666            None => return Ok(None),
1667            Some(approvals) => approvals,
1668        };
1669
1670        match (
1671            transaction_id.approvals_hash(),
1672            finalized_approvals,
1673            transaction,
1674        ) {
1675            (approvals_hash, finalized_approvals, Transaction::Deploy(deploy)) => {
1676                match ApprovalsHash::compute(&finalized_approvals) {
1677                    Ok(computed_approvals_hash) if computed_approvals_hash == approvals_hash => {
1678                        let deploy = deploy.with_approvals(finalized_approvals);
1679                        Ok(Some(Transaction::from(deploy)))
1680                    }
1681                    Ok(_computed_approvals_hash) => Ok(None),
1682                    Err(error) => {
1683                        error!(%error, "failed to calculate finalized deploy approvals hash");
1684                        Err(FatalStorageError::UnexpectedSerializationFailure(error))
1685                    }
1686                }
1687            }
1688            (approvals_hash, finalized_approvals, Transaction::V1(transaction_v1)) => {
1689                match ApprovalsHash::compute(&finalized_approvals) {
1690                    Ok(computed_approvals_hash) if computed_approvals_hash == approvals_hash => {
1691                        let transaction_v1 = transaction_v1.with_approvals(finalized_approvals);
1692                        Ok(Some(Transaction::from(transaction_v1)))
1693                    }
1694                    Ok(_computed_approvals_hash) => Ok(None),
1695                    Err(error) => {
1696                        error!(%error, "failed to calculate finalized transaction approvals hash");
1697                        Err(FatalStorageError::UnexpectedSerializationFailure(error))
1698                    }
1699                }
1700            }
1701        }
1702    }
1703
1704    /// Retrieves a single transaction along with its finalized approvals.
1705    #[allow(clippy::type_complexity)]
1706    fn get_transaction_with_finalized_approvals(
1707        txn: &(impl DataReader<TransactionHash, Transaction>
1708              + DataReader<TransactionHash, BTreeSet<Approval>>),
1709        transaction_hash: &TransactionHash,
1710    ) -> Result<Option<(Transaction, Option<BTreeSet<Approval>>)>, FatalStorageError> {
1711        let maybe_transaction: Option<Transaction> = txn.read(*transaction_hash)?;
1712        let transaction = match maybe_transaction {
1713            Some(transaction) => transaction,
1714            None => return Ok(None),
1715        };
1716
1717        let maybe_finalized_approvals: Option<BTreeSet<Approval>> = txn.read(*transaction_hash)?;
1718        let ret = (transaction, maybe_finalized_approvals);
1719
1720        Ok(Some(ret))
1721    }
1722
1723    pub(crate) fn get_sync_leap(
1724        &self,
1725        sync_leap_identifier: SyncLeapIdentifier,
1726    ) -> Result<FetchResponse<SyncLeap, SyncLeapIdentifier>, FatalStorageError> {
1727        let block_hash = sync_leap_identifier.block_hash();
1728
1729        let txn = self.block_store.checkout_ro()?;
1730
1731        let only_from_available_block_range = true;
1732        let trusted_block_header = match self.get_single_block_header_restricted(
1733            &txn,
1734            &block_hash,
1735            only_from_available_block_range,
1736        )? {
1737            Some(trusted_block_header) => trusted_block_header,
1738            None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1739        };
1740
1741        let trusted_ancestor_headers =
1742            match self.get_trusted_ancestor_headers(&txn, &trusted_block_header)? {
1743                Some(trusted_ancestor_headers) => trusted_ancestor_headers,
1744                None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1745            };
1746
1747        // highest block and signatures are not requested
1748        if sync_leap_identifier.trusted_ancestor_only() {
1749            return Ok(FetchResponse::Fetched(SyncLeap {
1750                trusted_ancestor_only: true,
1751                trusted_block_header,
1752                trusted_ancestor_headers,
1753                block_headers_with_signatures: vec![],
1754            }));
1755        }
1756
1757        let highest_complete_block_header =
1758            match self.get_highest_complete_block_header_with_signatures(&txn)? {
1759                Some(highest_complete_block_header) => highest_complete_block_header,
1760                None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1761            };
1762
1763        if highest_complete_block_header
1764            .block_header()
1765            .era_id()
1766            .saturating_sub(trusted_block_header.era_id().into())
1767            > self.recent_era_count.into()
1768        {
1769            return Ok(FetchResponse::NotProvided(sync_leap_identifier));
1770        }
1771
1772        if highest_complete_block_header.block_header().height() == 0 {
1773            return Ok(FetchResponse::Fetched(SyncLeap {
1774                trusted_ancestor_only: false,
1775                trusted_block_header,
1776                trusted_ancestor_headers: vec![],
1777                block_headers_with_signatures: vec![],
1778            }));
1779        }
1780
1781        // The `highest_complete_block_header` and `trusted_block_header` are both within the
1782        // highest complete block range, thus so are all the switch blocks between them.
1783        if let Some(block_headers_with_signatures) = self.get_block_headers_with_signatures(
1784            &txn,
1785            &trusted_block_header,
1786            &highest_complete_block_header,
1787        )? {
1788            return Ok(FetchResponse::Fetched(SyncLeap {
1789                trusted_ancestor_only: false,
1790                trusted_block_header,
1791                trusted_ancestor_headers,
1792                block_headers_with_signatures,
1793            }));
1794        }
1795
1796        Ok(FetchResponse::NotFound(sync_leap_identifier))
1797    }
1798
1799    /// Creates a serialized representation of a `FetchResponse` and the resulting message.
1800    ///
1801    /// If the given item is `Some`, returns a serialization of `FetchResponse::Fetched`. If
1802    /// enabled, the given serialization is also added to the in-memory pool.
1803    ///
1804    /// If the given item is `None`, returns a non-pooled serialization of
1805    /// `FetchResponse::NotFound`.
1806    fn update_pool_and_send<REv, T>(
1807        &mut self,
1808        effect_builder: EffectBuilder<REv>,
1809        sender: NodeId,
1810        serialized_id: &[u8],
1811        fetch_response: FetchResponse<T, T::Id>,
1812    ) -> Result<Effects<Event>, FatalStorageError>
1813    where
1814        REv: From<NetworkRequest<Message>> + Send,
1815        T: FetchItem,
1816    {
1817        let serialized = fetch_response
1818            .to_serialized()
1819            .map_err(FatalStorageError::StoredItemSerializationFailure)?;
1820        let shared: Arc<[u8]> = serialized.into();
1821
1822        if self.enable_mem_deduplication && fetch_response.was_found() {
1823            self.serialized_item_pool
1824                .put(serialized_id.into(), Arc::downgrade(&shared));
1825        }
1826
1827        let message = Message::new_get_response_from_serialized(<T as FetchItem>::TAG, shared);
1828        Ok(effect_builder.send_message(sender, message).ignore())
1829    }
1830
1831    /// Returns `true` if the storage should attempt to return a block. Depending on the
1832    /// `only_from_available_block_range` flag it should be unconditional or restricted by the
1833    /// available block range.
1834    fn should_return_block(
1835        &self,
1836        block_height: u64,
1837        only_from_available_block_range: bool,
1838    ) -> bool {
1839        if only_from_available_block_range {
1840            self.get_available_block_range().contains(block_height)
1841        } else {
1842            true
1843        }
1844    }
1845
1846    pub(crate) fn get_available_block_range(&self) -> AvailableBlockRange {
1847        match self.completed_blocks.highest_sequence() {
1848            Some(&seq) => seq.into(),
1849            None => AvailableBlockRange::RANGE_0_0,
1850        }
1851    }
1852
1853    pub(crate) fn get_highest_orphaned_block_header(&self) -> HighestOrphanedBlockResult {
1854        match self.completed_blocks.highest_sequence() {
1855            None => HighestOrphanedBlockResult::MissingHighestSequence,
1856            Some(seq) => {
1857                let low = seq.low();
1858                let txn = self
1859                    .block_store
1860                    .checkout_ro()
1861                    .expect("Could not start transaction for lmdb");
1862
1863                match txn.read(low) {
1864                    Ok(Some(block)) => match block {
1865                        Block::V1(_) | Block::V2(_) => {
1866                            HighestOrphanedBlockResult::Orphan(block.clone_header())
1867                        }
1868                    },
1869                    Ok(None) | Err(_) => HighestOrphanedBlockResult::MissingHeader(low),
1870                }
1871            }
1872        }
1873    }
1874
1875    /// Returns `count` highest switch block headers, sorted from lowest (oldest) to highest.
1876    pub(crate) fn read_highest_switch_block_headers(
1877        &self,
1878        count: u64,
1879    ) -> Result<Vec<BlockHeader>, FatalStorageError> {
1880        let txn = self.block_store.checkout_ro()?;
1881        if let Some(last_era_header) =
1882            DataReader::<LatestSwitchBlock, BlockHeader>::read(&txn, LatestSwitchBlock)?
1883        {
1884            let mut result = vec![];
1885            let last_era_id = last_era_header.era_id();
1886            result.push(last_era_header);
1887            for era_id in (0..last_era_id.value())
1888                .rev()
1889                .take(count as usize)
1890                .map(EraId::new)
1891            {
1892                match txn.read(era_id)? {
1893                    None => break,
1894                    Some(header) => result.push(header),
1895                }
1896            }
1897            result.reverse();
1898            debug!(
1899                ?result,
1900                "Storage: read_highest_switch_block_headers count:({})", count
1901            );
1902            Ok(result)
1903        } else {
1904            Ok(vec![])
1905        }
1906    }
1907
1908    fn read_block_execution_results_or_chunk(
1909        &self,
1910        request: &BlockExecutionResultsOrChunkId,
1911    ) -> Result<Option<BlockExecutionResultsOrChunk>, FatalStorageError> {
1912        let txn = self.block_store.checkout_ro()?;
1913
1914        let execution_results = match Self::get_execution_results(&txn, request.block_hash())? {
1915            Some(execution_results) => execution_results
1916                .into_iter()
1917                .map(|(_deploy_hash, execution_result)| execution_result)
1918                .collect(),
1919            None => return Ok(None),
1920        };
1921        Ok(BlockExecutionResultsOrChunk::new(
1922            *request.block_hash(),
1923            request.chunk_index(),
1924            execution_results,
1925        ))
1926    }
1927
1928    fn get_default_block_signatures(&self, block: &Block) -> BlockSignatures {
1929        match block {
1930            Block::V1(block) => BlockSignaturesV1::new(*block.hash(), block.era_id()).into(),
1931            Block::V2(block) => BlockSignaturesV2::new(
1932                *block.hash(),
1933                block.height(),
1934                block.era_id(),
1935                self.chain_name_hash,
1936            )
1937            .into(),
1938        }
1939    }
1940
1941    fn update_chain_height_metrics(&self) {
1942        if let Some(metrics) = self.metrics.as_ref() {
1943            if let Some(sequence) = self.completed_blocks.highest_sequence() {
1944                let highest_available_block: i64 = sequence.high().try_into().unwrap_or(i64::MIN);
1945                let lowest_available_block: i64 = sequence.low().try_into().unwrap_or(i64::MIN);
1946                metrics.chain_height.set(highest_available_block);
1947                metrics.highest_available_block.set(highest_available_block);
1948                metrics.lowest_available_block.set(lowest_available_block);
1949            }
1950        }
1951    }
1952
1953    pub(crate) fn read_block_header_by_hash(
1954        &self,
1955        block_hash: &BlockHash,
1956    ) -> Result<Option<BlockHeader>, FatalStorageError> {
1957        let ro_txn = self.block_store.checkout_ro()?;
1958
1959        ro_txn.read(*block_hash).map_err(FatalStorageError::from)
1960    }
1961
1962    fn get_execution_results(
1963        txn: &(impl DataReader<BlockHash, Block> + DataReader<TransactionHash, ExecutionResult>),
1964        block_hash: &BlockHash,
1965    ) -> Result<Option<Vec<(TransactionHash, ExecutionResult)>>, FatalStorageError> {
1966        let block = txn.read(*block_hash)?;
1967
1968        let block_body = match block {
1969            Some(block) => block.take_body(),
1970            None => return Ok(None),
1971        };
1972
1973        let transaction_hashes: Vec<TransactionHash> = match block_body {
1974            BlockBody::V1(v1) => v1
1975                .deploy_and_transfer_hashes()
1976                .map(TransactionHash::from)
1977                .collect(),
1978            BlockBody::V2(v2) => v2.all_transactions().copied().collect(),
1979        };
1980        let mut execution_results = vec![];
1981        for transaction_hash in transaction_hashes {
1982            match txn.read(transaction_hash)? {
1983                None => {
1984                    debug!(
1985                        %block_hash,
1986                        %transaction_hash,
1987                        "retrieved block but execution result for given transaction is absent"
1988                    );
1989                    return Ok(None);
1990                }
1991                Some(execution_result) => {
1992                    execution_results.push((transaction_hash, execution_result));
1993                }
1994            }
1995        }
1996        Ok(Some(execution_results))
1997    }
1998
1999    #[allow(clippy::type_complexity)]
2000    fn get_execution_results_with_transaction_headers(
2001        txn: &(impl DataReader<BlockHash, Block>
2002              + DataReader<TransactionHash, ExecutionResult>
2003              + DataReader<TransactionHash, Transaction>),
2004        block_hash: &BlockHash,
2005    ) -> Result<Option<Vec<(TransactionHash, TransactionHeader, ExecutionResult)>>, FatalStorageError>
2006    {
2007        let execution_results = match Self::get_execution_results(txn, block_hash)? {
2008            Some(execution_results) => execution_results,
2009            None => return Ok(None),
2010        };
2011
2012        let mut ret = Vec::with_capacity(execution_results.len());
2013        for (transaction_hash, execution_result) in execution_results {
2014            match txn.read(transaction_hash)? {
2015                None => {
2016                    error!(
2017                        %block_hash,
2018                        %transaction_hash,
2019                        "missing transaction"
2020                    );
2021                    return Ok(None);
2022                }
2023                Some(Transaction::Deploy(deploy)) => ret.push((
2024                    transaction_hash,
2025                    deploy.take_header().into(),
2026                    execution_result,
2027                )),
2028                Some(Transaction::V1(transaction_v1)) => {
2029                    ret.push((transaction_hash, (&transaction_v1).into(), execution_result))
2030                }
2031            };
2032        }
2033        Ok(Some(ret))
2034    }
2035
2036    fn get_block_utilization_score(
2037        &mut self,
2038        era_id: EraId,
2039        block_height: u64,
2040        block_utilization: u64,
2041    ) -> Option<(u64, u64)> {
2042        let ret = match self.utilization_tracker.get_mut(&era_id) {
2043            Some(utilization) => {
2044                utilization.entry(block_height).or_insert(block_utilization);
2045
2046                let transaction_count = utilization.values().sum();
2047                let block_count = utilization.keys().len() as u64;
2048
2049                Some((transaction_count, block_count))
2050            }
2051            None => {
2052                let mut utilization = BTreeMap::new();
2053                utilization.insert(block_height, block_utilization);
2054
2055                self.utilization_tracker.insert(era_id, utilization);
2056
2057                let block_count = 1u64;
2058                Some((block_utilization, block_count))
2059            }
2060        };
2061
2062        self.utilization_tracker
2063            .retain(|key_era_id, _| key_era_id.value() + 2 >= era_id.value());
2064
2065        ret
2066    }
2067}
2068
2069/// Decodes an item's ID, typically from an incoming request.
2070fn decode_item_id<T>(raw: &[u8]) -> Result<T::Id, GetRequestError>
2071where
2072    T: FetchItem,
2073{
2074    bincode::deserialize(raw).map_err(GetRequestError::MalformedIncomingItemId)
2075}
2076
2077fn should_move_storage_files_to_network_subdir(
2078    root: &Path,
2079    file_names: &[&str],
2080) -> Result<bool, FatalStorageError> {
2081    let mut files_found = vec![];
2082    let mut files_not_found = vec![];
2083
2084    for file_name in file_names {
2085        let file_path = root.join(file_name);
2086
2087        if file_path.exists() {
2088            files_found.push(file_path);
2089        } else {
2090            files_not_found.push(file_path);
2091        }
2092    }
2093
2094    let should_move_files = files_found.len() == file_names.len();
2095
2096    if !should_move_files && !files_found.is_empty() {
2097        error!(
2098            "found storage files: {:?}, missing storage files: {:?}",
2099            files_found, files_not_found
2100        );
2101
2102        return Err(FatalStorageError::MissingStorageFiles {
2103            missing_files: files_not_found,
2104        });
2105    }
2106
2107    Ok(should_move_files)
2108}
2109
2110fn move_storage_files_to_network_subdir(
2111    root: &Path,
2112    subdir: &Path,
2113    file_names: &[&str],
2114) -> Result<(), FatalStorageError> {
2115    file_names
2116        .iter()
2117        .map(|file_name| {
2118            let source_path = root.join(file_name);
2119            let dest_path = subdir.join(file_name);
2120            fs::rename(&source_path, &dest_path).map_err(|original_error| {
2121                FatalStorageError::UnableToMoveFile {
2122                    source_path,
2123                    dest_path,
2124                    original_error,
2125                }
2126            })
2127        })
2128        .collect::<Result<Vec<_>, FatalStorageError>>()?;
2129
2130    info!(
2131        "moved files: {:?} from: {:?} to: {:?}",
2132        file_names, root, subdir
2133    );
2134    Ok(())
2135}
2136
2137/// Returns all `Transform::WriteTransfer`s from the execution effects if this is an
2138/// `ExecutionResult::Success`, or an empty `Vec` if `ExecutionResult::Failure`.
2139fn successful_transfers(execution_result: &ExecutionResult) -> Vec<Transfer> {
2140    let mut all_transfers: Vec<Transfer> = vec![];
2141    match execution_result {
2142        ExecutionResult::V1(ExecutionResultV1::Success { effect, .. }) => {
2143            for transform_v1 in &effect.transforms {
2144                if let execution_result_v1::TransformKindV1::WriteTransfer(transfer_v1) =
2145                    &transform_v1.transform
2146                {
2147                    all_transfers.push(Transfer::V1(transfer_v1.clone()));
2148                }
2149            }
2150        }
2151        ExecutionResult::V2(execution_result_v2) => {
2152            if execution_result_v2.error_message.is_none() {
2153                for transfer in &execution_result_v2.transfers {
2154                    all_transfers.push(transfer.clone());
2155                }
2156            }
2157            // else no-op: we only record transfers from successful executions.
2158        }
2159        ExecutionResult::V1(ExecutionResultV1::Failure { .. }) => {
2160            // No-op: we only record transfers from successful executions.
2161        }
2162    }
2163    all_transfers
2164}
2165
2166// Testing code. The functions below allow direct inspection of the storage component and should
2167// only ever be used when writing tests.
2168#[cfg(test)]
2169impl Storage {
2170    /// Directly returns a transaction with finalized approvals from internal store.
2171    ///
2172    /// # Panics
2173    ///
2174    /// Panics if an IO error occurs.
2175    pub(crate) fn get_transaction_with_finalized_approvals_by_hash(
2176        &self,
2177        transaction_hash: &TransactionHash,
2178    ) -> Option<(Transaction, Option<BTreeSet<Approval>>)> {
2179        let txn = self
2180            .block_store
2181            .checkout_ro()
2182            .expect("could not create RO transaction");
2183        Self::get_transaction_with_finalized_approvals(&txn, transaction_hash)
2184            .expect("could not retrieve a transaction with finalized approvals from storage")
2185    }
2186
2187    /// Directly returns an execution result from internal store.
2188    ///
2189    /// # Panics
2190    ///
2191    /// Panics if an IO error occurs.
2192    pub(crate) fn read_execution_result(
2193        &self,
2194        transaction_hash: &TransactionHash,
2195    ) -> Option<ExecutionResult> {
2196        self.block_store
2197            .checkout_ro()
2198            .expect("could not create RO transaction")
2199            .read(*transaction_hash)
2200            .expect("could not retrieve execution result from storage")
2201    }
2202
2203    /// Directly returns a transaction from internal store.
2204    ///
2205    /// # Panics
2206    ///
2207    /// Panics if an IO error occurs.
2208    pub(crate) fn get_transaction_by_hash(
2209        &self,
2210        transaction_hash: TransactionHash,
2211    ) -> Option<Transaction> {
2212        self.block_store
2213            .checkout_ro()
2214            .expect("could not create RO transaction")
2215            .read(transaction_hash)
2216            .expect("could not retrieve value from storage")
2217    }
2218
2219    pub(crate) fn read_block_by_hash(&self, block_hash: BlockHash) -> Option<Block> {
2220        self.block_store
2221            .checkout_ro()
2222            .expect("could not create RO transaction")
2223            .read(block_hash)
2224            .expect("could not retrieve value from storage")
2225    }
2226
2227    pub(crate) fn read_block_by_height(&self, height: u64) -> Option<Block> {
2228        self.block_store
2229            .checkout_ro()
2230            .expect("could not create RO transaction")
2231            .read(height)
2232            .expect("could not retrieve value from storage")
2233    }
2234
2235    pub(crate) fn read_highest_block(&self) -> Option<Block> {
2236        self.block_store
2237            .checkout_ro()
2238            .expect("could not create RO transaction")
2239            .read(Tip)
2240            .expect("could not retrieve value from storage")
2241    }
2242
2243    pub(crate) fn read_highest_block_header(&self) -> Option<BlockHeader> {
2244        self.block_store
2245            .checkout_ro()
2246            .expect("could not create RO transaction")
2247            .read(Tip)
2248            .expect("could not retrieve value from storage")
2249    }
2250
2251    pub(crate) fn get_finality_signatures_for_block(
2252        &self,
2253        block_hash: BlockHash,
2254    ) -> Option<BlockSignatures> {
2255        let txn = self
2256            .block_store
2257            .checkout_ro()
2258            .expect("could not create RO transaction");
2259        let res: Option<BlockSignatures> = txn
2260            .read(block_hash)
2261            .expect("could not retrieve value from storage");
2262        txn.commit().expect("Could not commit transaction");
2263        res
2264    }
2265
2266    pub(crate) fn read_switch_block_by_era_id(&self, era_id: EraId) -> Option<Block> {
2267        self.block_store
2268            .checkout_ro()
2269            .expect("could not create RO transaction")
2270            .read(era_id)
2271            .expect("could not retrieve value from storage")
2272    }
2273
2274    pub(crate) fn read_block_with_signatures_by_hash(
2275        &self,
2276        block_hash: BlockHash,
2277        only_from_available_block_range: bool,
2278    ) -> Option<BlockWithSignatures> {
2279        let ro_txn = self
2280            .block_store
2281            .checkout_ro()
2282            .expect("should create ro txn");
2283        let block: Block = ro_txn.read(block_hash).expect("should read block")?;
2284
2285        if !(self.should_return_block(block.height(), only_from_available_block_range)) {
2286            return None;
2287        }
2288        if block_hash != *block.hash() {
2289            error!(
2290                queried_block_hash = ?block_hash,
2291                actual_block_hash = ?block.hash(),
2292                "block not stored under hash"
2293            );
2294            debug_assert_eq!(&block_hash, block.hash());
2295            return None;
2296        }
2297        let block_signatures = ro_txn
2298            .read(block_hash)
2299            .expect("should read block signatures")
2300            .unwrap_or_else(|| self.get_default_block_signatures(&block));
2301        if block_signatures.is_verified().is_err() {
2302            error!(?block, "invalid block signatures for block");
2303            debug_assert!(block_signatures.is_verified().is_ok());
2304            return None;
2305        }
2306        Some(BlockWithSignatures::new(block, block_signatures))
2307    }
2308
2309    pub(crate) fn read_block_with_signatures_by_height(
2310        &self,
2311        height: u64,
2312        only_from_available_block_range: bool,
2313    ) -> Option<BlockWithSignatures> {
2314        if !(self.should_return_block(height, only_from_available_block_range)) {
2315            return None;
2316        }
2317        let ro_txn = self
2318            .block_store
2319            .checkout_ro()
2320            .expect("should create ro txn");
2321        let block: Block = ro_txn.read(height).expect("should read block")?;
2322        let hash = block.hash();
2323        let block_signatures = ro_txn
2324            .read(*hash)
2325            .expect("should read block signatures")
2326            .unwrap_or_else(|| self.get_default_block_signatures(&block));
2327        Some(BlockWithSignatures::new(block, block_signatures))
2328    }
2329
2330    pub(crate) fn read_highest_block_with_signatures(
2331        &self,
2332        only_from_available_block_range: bool,
2333    ) -> Option<BlockWithSignatures> {
2334        let ro_txn = self
2335            .block_store
2336            .checkout_ro()
2337            .expect("should create ro txn");
2338        let highest_block = if only_from_available_block_range {
2339            let height = self.highest_complete_block_height()?;
2340            ro_txn.read(height).expect("should read block")?
2341        } else {
2342            DataReader::<Tip, Block>::read(&ro_txn, Tip).expect("should read block")?
2343        };
2344        let hash = highest_block.hash();
2345        let block_signatures = match ro_txn.read(*hash).expect("should read block signatures") {
2346            Some(signatures) => signatures,
2347            None => self.get_default_block_signatures(&highest_block),
2348        };
2349        Some(BlockWithSignatures::new(highest_block, block_signatures))
2350    }
2351
2352    pub(crate) fn read_execution_info(
2353        &self,
2354        transaction_hash: TransactionHash,
2355    ) -> Option<ExecutionInfo> {
2356        let txn = self
2357            .block_store
2358            .checkout_ro()
2359            .expect("should create ro txn");
2360        let block_hash_and_height: BlockHashHeightAndEra = txn
2361            .read(transaction_hash)
2362            .expect("should read block hash and height")?;
2363        let execution_result = txn
2364            .read(transaction_hash)
2365            .expect("should read execution result");
2366        Some(ExecutionInfo {
2367            block_hash: block_hash_and_height.block_hash,
2368            block_height: block_hash_and_height.block_height,
2369            execution_result,
2370        })
2371    }
2372}