1mod config;
34pub(crate) mod disjoint_sequences;
35mod error;
36mod event;
37mod metrics;
38mod object_pool;
39#[cfg(test)]
40mod tests;
41mod utils;
42
43use casper_storage::block_store::{
44 lmdb::{IndexedLmdbBlockStore, LmdbBlockStore},
45 types::{
46 ApprovalsHashes, BlockExecutionResults, BlockHashHeightAndEra, BlockHeight, BlockTransfers,
47 LatestSwitchBlock, StateStore, StateStoreKey, Tip, TransactionFinalizedApprovals,
48 },
49 BlockStoreError, BlockStoreProvider, BlockStoreTransaction, DataReader, DataWriter,
50};
51
52use std::{
53 borrow::Cow,
54 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
55 convert::TryInto,
56 fmt::{self, Display, Formatter},
57 fs::{self, OpenOptions},
58 io::ErrorKind,
59 path::{Path, PathBuf},
60 sync::Arc,
61};
62
63use casper_storage::DbRawBytesSpec;
64#[cfg(test)]
65use casper_types::BlockWithSignatures;
66use casper_types::{
67 bytesrepr::{FromBytes, ToBytes},
68 execution::{execution_result_v1, ExecutionResult, ExecutionResultV1},
69 Approval, ApprovalsHash, AvailableBlockRange, Block, BlockBody, BlockHash, BlockHeader,
70 BlockHeaderWithSignatures, BlockSignatures, BlockSignaturesV1, BlockSignaturesV2, BlockV2,
71 ChainNameDigest, DeployHash, EraId, ExecutionInfo, FinalitySignature, ProtocolVersion,
72 Timestamp, Transaction, TransactionConfig, TransactionHash, TransactionId, Transfer, U512,
73};
74use datasize::DataSize;
75use num_rational::Ratio;
76use prometheus::Registry;
77use smallvec::SmallVec;
78use tracing::{debug, error, info, warn};
79
80use crate::{
81 components::{
82 fetcher::{FetchItem, FetchResponse},
83 Component,
84 },
85 effect::{
86 announcements::FatalAnnouncement,
87 incoming::{NetRequest, NetRequestIncoming},
88 requests::{MarkBlockCompletedRequest, NetworkRequest, StorageRequest},
89 EffectBuilder, EffectExt, Effects,
90 },
91 fatal,
92 protocol::Message,
93 types::{
94 BlockExecutionResultsOrChunk, BlockExecutionResultsOrChunkId, BlockWithMetadata,
95 ExecutableBlock, LegacyDeploy, MaxTtl, NodeId, NodeRng, SyncLeap, SyncLeapIdentifier,
96 TransactionHeader, VariantMismatch,
97 },
98 utils::{display_error, WithDir},
99};
100
101pub use config::Config;
102use disjoint_sequences::{DisjointSequences, Sequence};
103pub use error::FatalStorageError;
104use error::GetRequestError;
105pub(crate) use event::Event;
106use metrics::Metrics;
107use object_pool::ObjectPool;
108
109const COMPONENT_NAME: &str = "storage";
110
111const COMPLETED_BLOCKS_STORAGE_KEY: &[u8] = b"completed_blocks_disjoint_sequences";
113const FORCE_RESYNC_FILE_NAME: &str = "force_resync";
115
116const STORAGE_FILES: [&str; 5] = [
117 "data.lmdb",
118 "data.lmdb-lock",
119 "storage.lmdb",
120 "storage.lmdb-lock",
121 "sse_index",
122];
123
124#[derive(DataSize, Debug)]
126pub struct Storage {
127 root: PathBuf,
129 pub(crate) block_store: IndexedLmdbBlockStore,
131 completed_blocks: DisjointSequences,
133 activation_era: EraId,
135 key_block_height_for_activation_point: Option<u64>,
137 enable_mem_deduplication: bool,
139 serialized_item_pool: ObjectPool<Box<[u8]>>,
143 recent_era_count: u64,
146 #[data_size(skip)]
147 metrics: Option<Metrics>,
148 max_ttl: MaxTtl,
150 chain_name_hash: ChainNameDigest,
152 transaction_config: TransactionConfig,
154 utilization_tracker: BTreeMap<EraId, BTreeMap<u64, u64>>,
156}
157
158pub(crate) enum HighestOrphanedBlockResult {
159 MissingHighestSequence,
160 Orphan(BlockHeader),
161 MissingHeader(u64),
162}
163
164impl Display for HighestOrphanedBlockResult {
165 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
166 match self {
167 HighestOrphanedBlockResult::MissingHighestSequence => {
168 write!(f, "missing highest sequence")
169 }
170 HighestOrphanedBlockResult::Orphan(block_header) => write!(
171 f,
172 "orphan, height={}, hash={}",
173 block_header.height(),
174 block_header.block_hash()
175 ),
176 HighestOrphanedBlockResult::MissingHeader(height) => {
177 write!(f, "missing header for block at height: {}", height)
178 }
179 }
180 }
181}
182
183impl<REv> Component<REv> for Storage
184where
185 REv: From<FatalAnnouncement> + From<NetworkRequest<Message>> + Send,
186{
187 type Event = Event;
188
189 fn handle_event(
190 &mut self,
191 effect_builder: EffectBuilder<REv>,
192 _rng: &mut NodeRng,
193 event: Self::Event,
194 ) -> Effects<Self::Event> {
195 let result = match event {
196 Event::StorageRequest(req) => self.handle_storage_request(*req),
197 Event::NetRequestIncoming(ref incoming) => {
198 match self.handle_net_request_incoming::<REv>(effect_builder, incoming) {
199 Ok(effects) => Ok(effects),
200 Err(GetRequestError::Fatal(fatal_error)) => Err(fatal_error),
201 Err(ref other_err) => {
202 warn!(
203 sender=%incoming.sender,
204 err=display_error(other_err),
205 "error handling net request"
206 );
207 Ok(Effects::new())
211 }
212 }
213 }
214 Event::MarkBlockCompletedRequest(req) => self.handle_mark_block_completed_request(req),
215 Event::MakeBlockExecutableRequest(req) => {
216 let ret = self.make_executable_block(&req.block_hash);
217 match ret {
218 Ok(maybe) => Ok(req.responder.respond(maybe).ignore()),
219 Err(err) => Err(err),
220 }
221 }
222 };
223
224 match result {
228 Ok(effects) => effects,
229 Err(err) => fatal!(effect_builder, "storage error: {}", err).ignore(),
230 }
231 }
232
233 fn name(&self) -> &str {
234 COMPONENT_NAME
235 }
236}
237
238impl Storage {
239 #[allow(clippy::too_many_arguments)]
241 pub fn new(
242 cfg: &WithDir<Config>,
243 hard_reset_to_start_of_era: Option<EraId>,
244 protocol_version: ProtocolVersion,
245 activation_era: EraId,
246 network_name: &str,
247 max_ttl: MaxTtl,
248 recent_era_count: u64,
249 registry: Option<&Registry>,
250 force_resync: bool,
251 transaction_config: TransactionConfig,
252 ) -> Result<Self, FatalStorageError> {
253 let config = cfg.value();
254
255 let mut root = cfg.with_dir(config.path.clone());
257 let network_subdir = root.join(network_name);
258
259 if !network_subdir.exists() {
260 fs::create_dir_all(&network_subdir).map_err(|err| {
261 FatalStorageError::CreateDatabaseDirectory(network_subdir.clone(), err)
262 })?;
263 }
264
265 if should_move_storage_files_to_network_subdir(&root, &STORAGE_FILES)? {
266 move_storage_files_to_network_subdir(&root, &network_subdir, &STORAGE_FILES)?;
267 }
268
269 root = network_subdir;
270
271 let total_size = config
273 .max_block_store_size
274 .saturating_add(config.max_deploy_store_size)
275 .saturating_add(config.max_deploy_metadata_store_size);
276
277 let block_store = LmdbBlockStore::new(root.as_path(), total_size)?;
278 let indexed_block_store =
279 IndexedLmdbBlockStore::new(block_store, hard_reset_to_start_of_era, protocol_version)?;
280
281 let metrics = registry.map(Metrics::new).transpose()?;
282
283 let mut component = Self {
284 root,
285 block_store: indexed_block_store,
286 completed_blocks: Default::default(),
287 activation_era,
288 key_block_height_for_activation_point: None,
289 enable_mem_deduplication: config.enable_mem_deduplication,
290 serialized_item_pool: ObjectPool::new(config.mem_pool_prune_interval),
291 recent_era_count,
292 max_ttl,
293 utilization_tracker: BTreeMap::new(),
294 metrics,
295 chain_name_hash: ChainNameDigest::from_chain_name(network_name),
296 transaction_config,
297 };
298
299 if force_resync {
300 let force_resync_file_path = component.root_path().join(FORCE_RESYNC_FILE_NAME);
301 match OpenOptions::new()
306 .create_new(true)
307 .write(true)
308 .open(&force_resync_file_path)
309 {
310 Ok(_file) => {
311 info!("initializing force resync");
314 component.completed_blocks = Default::default();
316 component.persist_completed_blocks()?;
317 return Ok(component);
319 }
320 Err(io_err) if io_err.kind() == ErrorKind::AlreadyExists => {
321 info!("skipping force resync as marker file exists");
322 }
323 Err(io_err) => {
324 warn!(
325 "couldn't operate on the force resync marker file at path {}: {}",
326 force_resync_file_path.to_string_lossy(),
327 io_err
328 );
329 }
330 }
331 }
332
333 {
334 let ro_txn = component.block_store.checkout_ro()?;
335 let maybe_state_store: Option<Vec<u8>> = ro_txn.read(StateStoreKey::new(
336 Cow::Borrowed(COMPLETED_BLOCKS_STORAGE_KEY),
337 ))?;
338 match maybe_state_store {
339 Some(raw) => {
340 let (mut sequences, _) = DisjointSequences::from_vec(raw)
341 .map_err(FatalStorageError::UnexpectedDeserializationFailure)?;
342
343 if let Some(header) = DataReader::<Tip, BlockHeader>::read(&ro_txn, Tip)? {
345 sequences.truncate(header.height());
346 }
347
348 component.completed_blocks = sequences;
349 }
350 None => {
351 let maybe_block_header: Option<BlockHeader> = ro_txn.read(Tip)?;
370 if let Some(highest_block_header) = maybe_block_header {
371 for height in (0..=highest_block_header.height()).rev() {
372 let maybe_header: Option<BlockHeader> = ro_txn.read(height)?;
373 match maybe_header {
374 Some(header) if header.protocol_version() < protocol_version => {
375 component.completed_blocks =
376 DisjointSequences::new(Sequence::new(0, header.height()));
377 break;
378 }
379 _ => {}
380 }
381 }
382 };
383 }
384 }
385 }
386 component.persist_completed_blocks()?;
387 Ok(component)
388 }
389
390 pub(crate) fn root_path(&self) -> &Path {
392 &self.root
393 }
394
395 fn handle_net_request_incoming<REv>(
396 &mut self,
397 effect_builder: EffectBuilder<REv>,
398 incoming: &NetRequestIncoming,
399 ) -> Result<Effects<Event>, GetRequestError>
400 where
401 REv: From<NetworkRequest<Message>> + Send,
402 {
403 if self.enable_mem_deduplication {
404 let unique_id = incoming.message.unique_id();
405
406 if let Some(serialized_item) = self
407 .serialized_item_pool
408 .get(AsRef::<[u8]>::as_ref(&unique_id))
409 {
410 let found = Message::new_get_response_from_serialized(
414 incoming.message.tag(),
415 serialized_item,
416 );
417 return Ok(effect_builder.send_message(incoming.sender, found).ignore());
418 }
419 }
420
421 match *(incoming.message) {
422 NetRequest::Transaction(ref serialized_id) => {
423 let id = decode_item_id::<Transaction>(serialized_id)?;
424 let opt_item = self.get_transaction_by_id(id)?;
425 let fetch_response = FetchResponse::from_opt(id, opt_item);
426
427 Ok(self.update_pool_and_send(
428 effect_builder,
429 incoming.sender,
430 serialized_id,
431 fetch_response,
432 )?)
433 }
434 NetRequest::LegacyDeploy(ref serialized_id) => {
435 let id = decode_item_id::<LegacyDeploy>(serialized_id)?;
436 let opt_item = self.get_legacy_deploy(id)?;
437 let fetch_response = FetchResponse::from_opt(id, opt_item);
438
439 Ok(self.update_pool_and_send(
440 effect_builder,
441 incoming.sender,
442 serialized_id,
443 fetch_response,
444 )?)
445 }
446 NetRequest::Block(ref serialized_id) => {
447 let id = decode_item_id::<Block>(serialized_id)?;
448 let opt_item: Option<Block> = self
449 .block_store
450 .checkout_ro()
451 .map_err(FatalStorageError::from)?
452 .read(id)
453 .map_err(FatalStorageError::from)?;
454 let fetch_response = FetchResponse::from_opt(id, opt_item);
455
456 Ok(self.update_pool_and_send(
457 effect_builder,
458 incoming.sender,
459 serialized_id,
460 fetch_response,
461 )?)
462 }
463 NetRequest::BlockHeader(ref serialized_id) => {
464 let item_id = decode_item_id::<BlockHeader>(serialized_id)?;
465 let opt_item: Option<BlockHeader> = self
466 .block_store
467 .checkout_ro()
468 .map_err(FatalStorageError::from)?
469 .read(item_id)
470 .map_err(FatalStorageError::from)?;
471 let fetch_response = FetchResponse::from_opt(item_id, opt_item);
472
473 Ok(self.update_pool_and_send(
474 effect_builder,
475 incoming.sender,
476 serialized_id,
477 fetch_response,
478 )?)
479 }
480 NetRequest::FinalitySignature(ref serialized_id) => {
481 let id = decode_item_id::<FinalitySignature>(serialized_id)?;
482 let opt_item = self
483 .block_store
484 .checkout_ro()
485 .map_err(FatalStorageError::from)?
486 .read(*id.block_hash())
487 .map_err(FatalStorageError::from)?
488 .and_then(|block_signatures: BlockSignatures| {
489 block_signatures.finality_signature(id.public_key())
490 });
491
492 if let Some(item) = opt_item.as_ref() {
493 if item.block_hash() != id.block_hash() || item.era_id() != id.era_id() {
494 return Err(GetRequestError::FinalitySignatureIdMismatch {
495 requested_id: id,
496 finality_signature: Box::new(item.clone()),
497 });
498 }
499 }
500 let fetch_response = FetchResponse::from_opt(id, opt_item);
501
502 Ok(self.update_pool_and_send(
503 effect_builder,
504 incoming.sender,
505 serialized_id,
506 fetch_response,
507 )?)
508 }
509 NetRequest::SyncLeap(ref serialized_id) => {
510 let item_id = decode_item_id::<SyncLeap>(serialized_id)?;
511 let fetch_response = self.get_sync_leap(item_id)?;
512
513 Ok(self.update_pool_and_send(
514 effect_builder,
515 incoming.sender,
516 serialized_id,
517 fetch_response,
518 )?)
519 }
520 NetRequest::ApprovalsHashes(ref serialized_id) => {
521 let item_id = decode_item_id::<ApprovalsHashes>(serialized_id)?;
522 let opt_item: Option<ApprovalsHashes> = self
523 .block_store
524 .checkout_ro()
525 .map_err(FatalStorageError::from)?
526 .read(item_id)
527 .map_err(FatalStorageError::from)?;
528 let fetch_response = FetchResponse::from_opt(item_id, opt_item);
529
530 Ok(self.update_pool_and_send(
531 effect_builder,
532 incoming.sender,
533 serialized_id,
534 fetch_response,
535 )?)
536 }
537 NetRequest::BlockExecutionResults(ref serialized_id) => {
538 let item_id = decode_item_id::<BlockExecutionResultsOrChunk>(serialized_id)?;
539 let opt_item = self.read_block_execution_results_or_chunk(&item_id)?;
540 let fetch_response = FetchResponse::from_opt(item_id, opt_item);
541
542 Ok(self.update_pool_and_send(
543 effect_builder,
544 incoming.sender,
545 serialized_id,
546 fetch_response,
547 )?)
548 }
549 }
550 }
551
552 fn handle_storage_request(
554 &mut self,
555 req: StorageRequest,
556 ) -> Result<Effects<Event>, FatalStorageError> {
557 Ok(match req {
561 StorageRequest::PutBlock { block, responder } => {
562 let mut rw_txn = self.block_store.checkout_rw()?;
563 let _ = rw_txn.write(&*block)?;
564 rw_txn.commit()?;
565 responder.respond(true).ignore()
566 }
567 StorageRequest::PutApprovalsHashes {
568 approvals_hashes,
569 responder,
570 } => {
571 let mut rw_txn = self.block_store.checkout_rw()?;
572 let _ = rw_txn.write(&*approvals_hashes)?;
573 rw_txn.commit()?;
574 responder.respond(true).ignore()
575 }
576 StorageRequest::GetBlock {
577 block_hash,
578 responder,
579 } => {
580 let maybe_block = self.block_store.checkout_ro()?.read(block_hash)?;
581 responder.respond(maybe_block).ignore()
582 }
583 StorageRequest::IsBlockStored {
584 block_hash,
585 responder,
586 } => {
587 let txn = self.block_store.checkout_ro()?;
588 responder
589 .respond(DataReader::<BlockHash, Block>::exists(&txn, block_hash)?)
590 .ignore()
591 }
592 StorageRequest::GetApprovalsHashes {
593 block_hash,
594 responder,
595 } => responder
596 .respond(self.block_store.checkout_ro()?.read(block_hash)?)
597 .ignore(),
598 StorageRequest::GetHighestCompleteBlock { responder } => responder
599 .respond(self.get_highest_complete_block()?)
600 .ignore(),
601 StorageRequest::GetHighestCompleteBlockHeader { responder } => responder
602 .respond(self.get_highest_complete_block_header()?)
603 .ignore(),
604 StorageRequest::GetTransactionsEraIds {
605 transaction_hashes,
606 responder,
607 } => {
608 let mut era_ids = HashSet::new();
609 let txn = self.block_store.checkout_ro()?;
610 for transaction_hash in &transaction_hashes {
611 let maybe_block_info: Option<BlockHashHeightAndEra> =
612 txn.read(*transaction_hash)?;
613 if let Some(block_info) = maybe_block_info {
614 era_ids.insert(block_info.era_id);
615 }
616 }
617 responder.respond(era_ids).ignore()
618 }
619 StorageRequest::GetBlockHeader {
620 block_hash,
621 only_from_available_block_range,
622 responder,
623 } => {
624 let txn = self.block_store.checkout_ro()?;
625 responder
626 .respond(self.get_single_block_header_restricted(
627 &txn,
628 &block_hash,
629 only_from_available_block_range,
630 )?)
631 .ignore()
632 }
633 StorageRequest::GetBlockTransfers {
634 block_hash,
635 responder,
636 } => {
637 let maybe_transfers = self.get_transfers(&block_hash)?;
638 responder.respond(maybe_transfers).ignore()
639 }
640 StorageRequest::PutTransaction {
641 transaction,
642 responder,
643 } => {
644 let mut rw_txn = self.block_store.checkout_rw()?;
645 if DataReader::<TransactionHash, Transaction>::exists(&rw_txn, transaction.hash())?
646 {
647 responder.respond(false).ignore()
648 } else {
649 let _ = rw_txn.write(&*transaction)?;
650 rw_txn.commit()?;
651 responder.respond(true).ignore()
652 }
653 }
654 StorageRequest::GetTransactions {
655 transaction_hashes,
656 responder,
657 } => responder
658 .respond(self.get_transactions_with_finalized_approvals(transaction_hashes.iter())?)
659 .ignore(),
660 StorageRequest::GetLegacyDeploy {
661 deploy_hash,
662 responder,
663 } => {
664 let maybe_legacy_deploy = self.get_legacy_deploy(deploy_hash)?;
665 responder.respond(maybe_legacy_deploy).ignore()
666 }
667 StorageRequest::GetTransaction {
668 transaction_id,
669 responder,
670 } => {
671 let ro_txn = self.block_store.checkout_ro()?;
672 let maybe_transaction = match Self::get_transaction_with_finalized_approvals(
673 &ro_txn,
674 &transaction_id.transaction_hash(),
675 )? {
676 None => None,
677 Some((transaction, maybe_approvals)) => {
678 let transaction = if let Some(approvals) = maybe_approvals {
679 transaction.with_approvals(approvals)
680 } else {
681 transaction
682 };
683 (transaction.fetch_id() == transaction_id).then_some(transaction)
684 }
685 };
686 responder.respond(maybe_transaction).ignore()
687 }
688 StorageRequest::GetTransactionAndExecutionInfo {
689 transaction_hash,
690 with_finalized_approvals,
691 responder,
692 } => {
693 let ro_txn = self.block_store.checkout_ro()?;
694
695 let transaction = if with_finalized_approvals {
696 match Self::get_transaction_with_finalized_approvals(
697 &ro_txn,
698 &transaction_hash,
699 )? {
700 Some((transaction, maybe_approvals)) => {
701 if let Some(approvals) = maybe_approvals {
702 transaction.with_approvals(approvals)
703 } else {
704 transaction
705 }
706 }
707 None => return Ok(responder.respond(None).ignore()),
708 }
709 } else {
710 match ro_txn.read(transaction_hash)? {
711 Some(transaction) => transaction,
712 None => return Ok(responder.respond(None).ignore()),
713 }
714 };
715
716 let block_hash_height_and_era: BlockHashHeightAndEra =
717 match ro_txn.read(transaction_hash)? {
718 Some(value) => value,
719 None => return Ok(responder.respond(Some((transaction, None))).ignore()),
720 };
721
722 let execution_result = ro_txn.read(transaction_hash)?;
723 let execution_info = ExecutionInfo {
724 block_hash: block_hash_height_and_era.block_hash,
725 block_height: block_hash_height_and_era.block_height,
726 execution_result,
727 };
728
729 responder
730 .respond(Some((transaction, Some(execution_info))))
731 .ignore()
732 }
733 StorageRequest::IsTransactionStored {
734 transaction_id,
735 responder,
736 } => {
737 let txn = self.block_store.checkout_ro()?;
738 let has_transaction = DataReader::<TransactionHash, Transaction>::exists(
739 &txn,
740 transaction_id.transaction_hash(),
741 )?;
742 responder.respond(has_transaction).ignore()
743 }
744 StorageRequest::GetExecutionResults {
745 block_hash,
746 responder,
747 } => {
748 let txn = self.block_store.checkout_ro()?;
749 responder
750 .respond(Self::get_execution_results_with_transaction_headers(
751 &txn,
752 &block_hash,
753 )?)
754 .ignore()
755 }
756 StorageRequest::GetBlockExecutionResultsOrChunk { id, responder } => responder
757 .respond(self.read_block_execution_results_or_chunk(&id)?)
758 .ignore(),
759 StorageRequest::PutExecutionResults {
760 block_hash,
761 block_height,
762 era_id,
763 execution_results,
764 responder,
765 } => {
766 let mut rw_txn = self.block_store.checkout_rw()?;
767 let _ = rw_txn.write(&BlockExecutionResults {
768 block_info: BlockHashHeightAndEra::new(*block_hash, block_height, era_id),
769 exec_results: execution_results,
770 })?;
771 rw_txn.commit()?;
772 responder.respond(()).ignore()
773 }
774 StorageRequest::GetFinalitySignature { id, responder } => {
775 let maybe_sig = self
776 .block_store
777 .checkout_ro()?
778 .read(*id.block_hash())?
779 .and_then(|sigs: BlockSignatures| sigs.finality_signature(id.public_key()))
780 .filter(|sig| sig.era_id() == id.era_id());
781 responder.respond(maybe_sig).ignore()
782 }
783 StorageRequest::IsFinalitySignatureStored { id, responder } => {
784 let has_signature = self
785 .block_store
786 .checkout_ro()?
787 .read(*id.block_hash())?
788 .map(|sigs: BlockSignatures| sigs.has_finality_signature(id.public_key()))
789 .unwrap_or(false);
790 responder.respond(has_signature).ignore()
791 }
792 StorageRequest::GetBlockAndMetadataByHeight {
793 block_height,
794 only_from_available_block_range,
795 responder,
796 } => {
797 if !(self.should_return_block(block_height, only_from_available_block_range)) {
798 return Ok(responder.respond(None).ignore());
799 }
800
801 let ro_txn = self.block_store.checkout_ro()?;
802
803 let block: Block = {
804 if let Some(block) = ro_txn.read(block_height)? {
805 block
806 } else {
807 return Ok(responder.respond(None).ignore());
808 }
809 };
810
811 let hash = block.hash();
812 let block_signatures = match ro_txn.read(*hash)? {
813 Some(signatures) => signatures,
814 None => self.get_default_block_signatures(&block),
815 };
816 responder
817 .respond(Some(BlockWithMetadata {
818 block,
819 block_signatures,
820 }))
821 .ignore()
822 }
823 StorageRequest::PutBlockSignatures {
824 signatures,
825 responder,
826 } => {
827 if signatures.is_empty() {
828 error!(
829 ?signatures,
830 "should not attempt to store empty collection of block signatures"
831 );
832 return Ok(responder.respond(false).ignore());
833 }
834 let mut txn = self.block_store.checkout_rw()?;
835 let old_data: Option<BlockSignatures> = txn.read(*signatures.block_hash())?;
836 let new_data = match old_data {
837 None => signatures,
838 Some(mut data) => {
839 if let Err(error) = data.merge(signatures) {
840 error!(%error, "failed to put block signatures");
841 return Ok(responder.respond(false).ignore());
842 }
843 data
844 }
845 };
846 let _ = txn.write(&new_data)?;
847 txn.commit()?;
848 responder.respond(true).ignore()
849 }
850 StorageRequest::PutFinalitySignature {
851 signature,
852 responder,
853 } => {
854 let mut rw_txn = self.block_store.checkout_rw()?;
855 let block_hash = signature.block_hash();
856 let mut block_signatures: BlockSignatures =
857 if let Some(existing_signatures) = rw_txn.read(*block_hash)? {
858 existing_signatures
859 } else {
860 match &*signature {
861 FinalitySignature::V1(signature) => {
862 BlockSignaturesV1::new(*signature.block_hash(), signature.era_id())
863 .into()
864 }
865 FinalitySignature::V2(signature) => BlockSignaturesV2::new(
866 *signature.block_hash(),
867 signature.block_height(),
868 signature.era_id(),
869 signature.chain_name_hash(),
870 )
871 .into(),
872 }
873 };
874 match (&mut block_signatures, *signature) {
875 (
876 BlockSignatures::V1(ref mut block_signatures),
877 FinalitySignature::V1(signature),
878 ) => {
879 block_signatures.insert_signature(
880 signature.public_key().clone(),
881 *signature.signature(),
882 );
883 }
884 (
885 BlockSignatures::V2(ref mut block_signatures),
886 FinalitySignature::V2(signature),
887 ) => {
888 block_signatures.insert_signature(
889 signature.public_key().clone(),
890 *signature.signature(),
891 );
892 }
893 (block_signatures, signature) => {
894 let mismatch =
895 VariantMismatch(Box::new((block_signatures.clone(), signature)));
896 return Err(FatalStorageError::from(mismatch));
897 }
898 }
899
900 let _ = rw_txn.write(&block_signatures);
901 rw_txn.commit()?;
902 responder.respond(true).ignore()
903 }
904 StorageRequest::GetBlockSignature {
905 block_hash,
906 public_key,
907 responder,
908 } => {
909 let maybe_signatures: Option<BlockSignatures> =
910 self.block_store.checkout_ro()?.read(block_hash)?;
911 responder
912 .respond(
913 maybe_signatures
914 .and_then(|signatures| signatures.finality_signature(&public_key)),
915 )
916 .ignore()
917 }
918 StorageRequest::GetBlockHeaderByHeight {
919 block_height,
920 only_from_available_block_range,
921 responder,
922 } => {
923 let maybe_header = self
924 .read_block_header_by_height(block_height, only_from_available_block_range)?;
925 responder.respond(maybe_header).ignore()
926 }
927 StorageRequest::GetLatestSwitchBlockHeader { responder } => {
928 let txn = self.block_store.checkout_ro()?;
929 let maybe_header = txn.read(LatestSwitchBlock)?;
930 responder.respond(maybe_header).ignore()
931 }
932 StorageRequest::GetSwitchBlockHeaderByEra { era_id, responder } => {
933 let txn = self.block_store.checkout_ro()?;
934 let maybe_header = txn.read(era_id)?;
935 responder.respond(maybe_header).ignore()
936 }
937 StorageRequest::PutBlockHeader {
938 block_header,
939 responder,
940 } => {
941 let mut rw_txn = self.block_store.checkout_rw()?;
942 let _ = rw_txn.write(&*block_header)?;
943 rw_txn.commit()?;
944 responder.respond(true).ignore()
945 }
946 StorageRequest::GetAvailableBlockRange { responder } => {
947 responder.respond(self.get_available_block_range()).ignore()
948 }
949 StorageRequest::StoreFinalizedApprovals {
950 ref transaction_hash,
951 ref finalized_approvals,
952 responder,
953 } => {
954 info!(txt=?transaction_hash, count=finalized_approvals.len(), "storing finalized approvals {:?}", finalized_approvals);
955 responder
956 .respond(self.store_finalized_approvals(transaction_hash, finalized_approvals)?)
957 .ignore()
958 }
959 StorageRequest::PutExecutedBlock {
960 block,
961 approvals_hashes,
962 execution_results,
963 responder,
964 } => {
965 let block: Block = (*block).clone().into();
966 let transaction_config = self.transaction_config.clone();
967 responder
968 .respond(self.put_executed_block(
969 transaction_config,
970 &block,
971 &approvals_hashes,
972 execution_results,
973 )?)
974 .ignore()
975 }
976 StorageRequest::GetKeyBlockHeightForActivationPoint { responder } => {
977 if self.key_block_height_for_activation_point.is_none() {
979 let key_block_era = self.activation_era.predecessor().unwrap_or_default();
980 let txn = self.block_store.checkout_ro()?;
981 let key_block_header: BlockHeader = match txn.read(key_block_era)? {
982 Some(block_header) => block_header,
983 None => return Ok(responder.respond(None).ignore()),
984 };
985 self.key_block_height_for_activation_point = Some(key_block_header.height());
986 }
987 responder
988 .respond(self.key_block_height_for_activation_point)
989 .ignore()
990 }
991 StorageRequest::GetRawData {
992 key,
993 responder,
994 record_id,
995 } => {
996 let db_table_id = utils::db_table_id_from_record_id(record_id)
997 .map_err(|_| FatalStorageError::UnexpectedRecordId(record_id))?;
998 let txn = self.block_store.checkout_ro()?;
999 let maybe_data: Option<DbRawBytesSpec> = txn.read((db_table_id, key))?;
1000 match maybe_data {
1001 None => responder.respond(None).ignore(),
1002 Some(db_raw) => responder.respond(Some(db_raw)).ignore(),
1003 }
1004 }
1005 StorageRequest::GetBlockUtilizationScore {
1006 era_id,
1007 block_height,
1008 switch_block_utilization,
1009 responder,
1010 } => {
1011 let utilization = self.get_block_utilization_score(
1012 era_id,
1013 block_height,
1014 switch_block_utilization,
1015 );
1016
1017 responder.respond(utilization).ignore()
1018 }
1019 })
1020 }
1021
1022 pub(crate) fn read_block_header_by_height(
1023 &self,
1024 block_height: u64,
1025 only_from_available_block_range: bool,
1026 ) -> Result<Option<BlockHeader>, FatalStorageError> {
1027 if !(self.should_return_block(block_height, only_from_available_block_range)) {
1028 Ok(None)
1029 } else {
1030 let txn = self.block_store.checkout_ro()?;
1031 txn.read(block_height).map_err(FatalStorageError::from)
1032 }
1033 }
1034
1035 pub(crate) fn get_switch_block_by_era_id(
1036 &self,
1037 era_id: &EraId,
1038 ) -> Result<Option<Block>, FatalStorageError> {
1039 let txn = self.block_store.checkout_ro()?;
1040 txn.read(*era_id).map_err(FatalStorageError::from)
1041 }
1042
1043 #[allow(clippy::type_complexity)]
1045 fn get_transactions_with_finalized_approvals<'a>(
1046 &self,
1047 transaction_hashes: impl Iterator<Item = &'a TransactionHash>,
1048 ) -> Result<SmallVec<[Option<(Transaction, Option<BTreeSet<Approval>>)>; 1]>, FatalStorageError>
1049 {
1050 let ro_txn = self.block_store.checkout_ro()?;
1051
1052 transaction_hashes
1053 .map(|transaction_hash| {
1054 Self::get_transaction_with_finalized_approvals(&ro_txn, transaction_hash)
1055 })
1056 .collect()
1057 }
1058
1059 pub(crate) fn put_executed_block(
1060 &mut self,
1061 transaction_config: TransactionConfig,
1062 block: &Block,
1063 approvals_hashes: &ApprovalsHashes,
1064 execution_results: HashMap<TransactionHash, ExecutionResult>,
1065 ) -> Result<bool, FatalStorageError> {
1066 let mut txn = self.block_store.checkout_rw()?;
1067 let era_id = block.era_id();
1068 let block_utilization_score = block.block_utilization(transaction_config.clone());
1069 let has_hit_slot_limit = block.has_hit_slot_capacity(transaction_config.clone());
1070 let block_hash = txn.write(block)?;
1071 let _ = txn.write(approvals_hashes)?;
1072 let block_info = BlockHashHeightAndEra::new(block_hash, block.height(), block.era_id());
1073
1074 let utilization = if has_hit_slot_limit {
1075 debug!("Block is at slot capacity, using slot utilization score");
1076 block_utilization_score
1077 } else if execution_results.is_empty() {
1078 0u64
1079 } else {
1080 let total_gas_utilization = {
1081 let total_gas_limit: U512 = execution_results
1082 .values()
1083 .map(|results| match results {
1084 ExecutionResult::V1(v1_result) => match v1_result {
1085 ExecutionResultV1::Failure { cost, .. } => *cost,
1086 ExecutionResultV1::Success { cost, .. } => *cost,
1087 },
1088 ExecutionResult::V2(v2_result) => v2_result.limit.value(),
1089 })
1090 .sum();
1091
1092 let consumed: u64 = total_gas_limit.as_u64();
1093 let block_gas_limit = transaction_config.block_gas_limit;
1094
1095 Ratio::new(consumed * 100u64, block_gas_limit).to_integer()
1096 };
1097 debug!("Gas utilization at {total_gas_utilization}");
1098
1099 let total_size_utilization = {
1100 let size_used: u64 = execution_results
1101 .values()
1102 .map(|results| {
1103 if let ExecutionResult::V2(result) = results {
1104 result.size_estimate
1105 } else {
1106 0u64
1107 }
1108 })
1109 .sum();
1110
1111 let block_size_limit = transaction_config.max_block_size as u64;
1112 Ratio::new(size_used * 100, block_size_limit).to_integer()
1113 };
1114
1115 debug!("Storage utilization at {total_size_utilization}");
1116
1117 let scores = [
1118 block_utilization_score,
1119 total_size_utilization,
1120 total_gas_utilization,
1121 ];
1122
1123 match scores.iter().max() {
1124 Some(max_utlization) => *max_utlization,
1125 None => {
1126 warn!("Unable to determine max utilization, marking 0 utilization");
1129 0u64
1130 }
1131 }
1132 };
1133
1134 debug!("Utilization for block is {utilization}");
1135
1136 let _ = txn.write(&BlockExecutionResults {
1137 block_info,
1138 exec_results: execution_results,
1139 })?;
1140 txn.commit()?;
1141
1142 match self.utilization_tracker.get_mut(&era_id) {
1143 Some(block_score) => {
1144 block_score.insert(block.height(), utilization);
1145 }
1146 None => {
1147 let mut block_score = BTreeMap::new();
1148 block_score.insert(block.height(), utilization);
1149 self.utilization_tracker.insert(era_id, block_score);
1150 }
1151 }
1152
1153 Ok(true)
1154 }
1155
1156 fn handle_mark_block_completed_request(
1158 &mut self,
1159 MarkBlockCompletedRequest {
1160 block_height,
1161 responder,
1162 }: MarkBlockCompletedRequest,
1163 ) -> Result<Effects<Event>, FatalStorageError> {
1164 let is_new = self.mark_block_complete(block_height)?;
1165 Ok(responder.respond(is_new).ignore())
1166 }
1167
1168 fn mark_block_complete(&mut self, block_height: u64) -> Result<bool, FatalStorageError> {
1171 let is_new = self.completed_blocks.insert(block_height);
1172 if is_new {
1173 self.persist_completed_blocks()?;
1174 info!(
1175 "Storage: marked block {} complete: {}",
1176 block_height,
1177 self.get_available_block_range()
1178 );
1179 self.update_chain_height_metrics();
1180 } else {
1181 debug!(
1182 "Storage: tried to mark already-complete block {} complete",
1183 block_height
1184 );
1185 }
1186 Ok(is_new)
1187 }
1188
1189 fn persist_completed_blocks(&mut self) -> Result<(), FatalStorageError> {
1191 let serialized = self
1192 .completed_blocks
1193 .to_bytes()
1194 .map_err(FatalStorageError::UnexpectedSerializationFailure)?;
1195 let mut rw_txn = self.block_store.checkout_rw()?;
1196 rw_txn.write(&StateStore {
1197 key: Cow::Borrowed(COMPLETED_BLOCKS_STORAGE_KEY),
1198 value: serialized,
1199 })?;
1200 rw_txn.commit().map_err(FatalStorageError::from)
1201 }
1202
1203 pub(crate) fn highest_complete_block_height(&self) -> Option<u64> {
1205 self.completed_blocks.highest_sequence().map(Sequence::high)
1206 }
1207
1208 pub(crate) fn read_blocks_for_replay_protection(
1215 &self,
1216 ) -> Result<Vec<Block>, FatalStorageError> {
1217 let ro_txn = self.block_store.checkout_ro()?;
1218
1219 let timestamp =
1220 match DataReader::<LatestSwitchBlock, BlockHeader>::read(&ro_txn, LatestSwitchBlock)? {
1221 Some(last_era_header) => last_era_header
1222 .timestamp()
1223 .saturating_sub(self.max_ttl.value()),
1224 None => Timestamp::now(),
1225 };
1226
1227 let mut blocks = Vec::new();
1228 for sequence in self.completed_blocks.sequences().iter().rev() {
1229 let hi = sequence.high();
1230 let low = sequence.low();
1231 for idx in (low..=hi).rev() {
1232 let maybe_block: Result<Option<Block>, BlockStoreError> = ro_txn.read(idx);
1233 match maybe_block {
1234 Ok(Some(block)) => {
1235 let should_continue = block.timestamp() >= timestamp;
1236 blocks.push(block);
1237 if false == should_continue {
1238 return Ok(blocks);
1239 }
1240 }
1241 Ok(None) => {
1242 continue;
1243 }
1244 Err(err) => return Err(FatalStorageError::BlockStoreError(err)),
1245 }
1246 }
1247 }
1248 Ok(blocks)
1249 }
1250
1251 pub(crate) fn make_executable_block(
1253 &self,
1254 block_hash: &BlockHash,
1255 ) -> Result<Option<ExecutableBlock>, FatalStorageError> {
1256 let (block, transactions) =
1257 match self.read_block_and_finalized_transactions_by_hash(*block_hash)? {
1258 Some(block_and_finalized_transactions) => block_and_finalized_transactions,
1259 None => {
1260 error!(
1261 ?block_hash,
1262 "Storage: unable to make_executable_block for {}", block_hash
1263 );
1264 return Ok(None);
1265 }
1266 };
1267 let maybe_finalized_approvals: Option<ApprovalsHashes> =
1268 self.block_store.checkout_ro()?.read(*block.hash())?;
1269 if let Some(finalized_approvals) = maybe_finalized_approvals {
1270 if transactions.len() != finalized_approvals.approvals_hashes().len() {
1271 error!(
1272 ?block_hash,
1273 "Storage: transaction hashes length mismatch {}", block_hash
1274 );
1275 return Err(FatalStorageError::ApprovalsHashesLengthMismatch {
1276 block_hash: *block_hash,
1277 expected: transactions.len(),
1278 actual: finalized_approvals.approvals_hashes().len(),
1279 });
1280 }
1281 for (transaction, hash) in transactions
1282 .iter()
1283 .zip(finalized_approvals.approvals_hashes())
1284 {
1285 let computed_hash = transaction.compute_approvals_hash().map_err(|error| {
1286 error!(%error, "failed to serialize approvals");
1287 FatalStorageError::UnexpectedSerializationFailure(error)
1288 })?;
1289 if computed_hash == hash {
1290 continue;
1291 }
1292 error!(?block_hash, "Storage: transaction with incorrect approvals");
1297 return Ok(None);
1298 }
1299 }
1300
1301 let executable_block = ExecutableBlock::from_block_and_transactions(block, transactions);
1302 info!(%block_hash, "Storage: created {}", executable_block);
1303 Ok(Some(executable_block))
1304 }
1305
1306 fn read_block_and_finalized_transactions_by_hash(
1309 &self,
1310 block_hash: BlockHash,
1311 ) -> Result<Option<(BlockV2, Vec<Transaction>)>, FatalStorageError> {
1312 let txn = self.block_store.checkout_ro()?;
1313
1314 let Some(block) = txn.read(block_hash)? else {
1315 debug!(
1316 ?block_hash,
1317 "Storage: read_block_and_finalized_transactions_by_hash failed to get block for {}",
1318 block_hash
1319 );
1320 return Ok(None);
1321 };
1322
1323 let Block::V2(block) = block else {
1324 debug!(
1325 ?block_hash,
1326 "Storage: read_block_and_finalized_transactions_by_hash expected block V2 {}",
1327 block_hash
1328 );
1329 return Ok(None);
1330 };
1331
1332 let mut transactions = vec![];
1333 for (transaction, _) in (self
1334 .get_transactions_with_finalized_approvals(block.all_transactions())?)
1335 .into_iter()
1336 .flatten()
1337 {
1338 transactions.push(transaction);
1339 }
1340
1341 Ok(Some((block, transactions)))
1342 }
1343
1344 fn get_highest_complete_block_header(&self) -> Result<Option<BlockHeader>, FatalStorageError> {
1347 let highest_complete_block_height = match self.completed_blocks.highest_sequence() {
1348 Some(sequence) => sequence.high(),
1349 None => {
1350 return Ok(None);
1351 }
1352 };
1353
1354 let txn = self.block_store.checkout_ro()?;
1355 txn.read(highest_complete_block_height)
1356 .map_err(FatalStorageError::from)
1357 }
1358
1359 fn get_highest_complete_block_header_with_signatures(
1362 &self,
1363 txn: &(impl DataReader<BlockHeight, BlockHeader> + DataReader<BlockHash, BlockSignatures>),
1364 ) -> Result<Option<BlockHeaderWithSignatures>, FatalStorageError> {
1365 let highest_complete_block_height = match self.completed_blocks.highest_sequence() {
1366 Some(sequence) => sequence.high(),
1367 None => {
1368 return Ok(None);
1369 }
1370 };
1371
1372 let block_header: Option<BlockHeader> = txn.read(highest_complete_block_height)?;
1373 match block_header {
1374 Some(header) => {
1375 let block_header_hash = header.block_hash();
1376 let block_signatures: BlockSignatures = match txn.read(block_header_hash)? {
1377 Some(signatures) => signatures,
1378 None => match &header {
1379 BlockHeader::V1(header) => BlockSignatures::V1(BlockSignaturesV1::new(
1380 header.block_hash(),
1381 header.era_id(),
1382 )),
1383 BlockHeader::V2(header) => BlockSignatures::V2(BlockSignaturesV2::new(
1384 header.block_hash(),
1385 header.height(),
1386 header.era_id(),
1387 self.chain_name_hash,
1388 )),
1389 },
1390 };
1391 Ok(Some(BlockHeaderWithSignatures::new(
1392 header,
1393 block_signatures,
1394 )))
1395 }
1396 None => Ok(None),
1397 }
1398 }
1399
1400 pub fn get_highest_complete_block(&self) -> Result<Option<Block>, FatalStorageError> {
1402 let highest_complete_block_height = match self.highest_complete_block_height() {
1403 Some(height) => height,
1404 None => {
1405 return Ok(None);
1406 }
1407 };
1408
1409 let txn = self.block_store.checkout_ro()?;
1410 txn.read(highest_complete_block_height)
1411 .map_err(FatalStorageError::from)
1412 }
1413
1414 fn get_single_block_header_restricted(
1418 &self,
1419 txn: &impl DataReader<BlockHash, BlockHeader>,
1420 block_hash: &BlockHash,
1421 only_from_available_block_range: bool,
1422 ) -> Result<Option<BlockHeader>, FatalStorageError> {
1423 let block_header = match txn.read(*block_hash)? {
1424 Some(header) => header,
1425 None => return Ok(None),
1426 };
1427
1428 if !(self.should_return_block(block_header.height(), only_from_available_block_range)) {
1429 return Ok(None);
1430 }
1431
1432 Ok(Some(block_header))
1433 }
1434
1435 fn get_trusted_ancestor_headers(
1438 &self,
1439 txn: &impl DataReader<BlockHash, BlockHeader>,
1440 trusted_block_header: &BlockHeader,
1441 ) -> Result<Option<Vec<BlockHeader>>, FatalStorageError> {
1442 if trusted_block_header.is_genesis() {
1443 return Ok(Some(vec![]));
1444 }
1445 let available_block_range = self.get_available_block_range();
1446 let mut result = vec![];
1447 let mut current_trusted_block_header = trusted_block_header.clone();
1448 loop {
1449 let parent_hash = current_trusted_block_header.parent_hash();
1450 let parent_block_header: BlockHeader = match txn.read(*parent_hash)? {
1451 Some(block_header) => block_header,
1452 None => {
1453 warn!(%parent_hash, "block header not found");
1454 return Ok(None);
1455 }
1456 };
1457
1458 if !available_block_range.contains(parent_block_header.height()) {
1459 debug!(%parent_hash, "block header not complete");
1460 return Ok(None);
1461 }
1462
1463 result.push(parent_block_header.clone());
1464 if parent_block_header.is_switch_block() || parent_block_header.is_genesis() {
1465 break;
1466 }
1467 current_trusted_block_header = parent_block_header;
1468 }
1469 Ok(Some(result))
1470 }
1471
1472 fn get_block_headers_with_signatures(
1475 &self,
1476 txn: &(impl DataReader<BlockHash, BlockSignatures> + DataReader<EraId, BlockHeader>),
1477 trusted_block_header: &BlockHeader,
1478 highest_block_header_with_signatures: &BlockHeaderWithSignatures,
1479 ) -> Result<Option<Vec<BlockHeaderWithSignatures>>, FatalStorageError> {
1480 if trusted_block_header.block_hash()
1481 == highest_block_header_with_signatures
1482 .block_header()
1483 .block_hash()
1484 {
1485 return Ok(Some(vec![]));
1486 }
1487
1488 let start_era_id: u64 = trusted_block_header.next_block_era_id().into();
1489 let current_era_id: u64 = highest_block_header_with_signatures
1490 .block_header()
1491 .era_id()
1492 .into();
1493
1494 let mut result = vec![];
1495
1496 for era_id in start_era_id..current_era_id {
1497 let maybe_block_header: Option<BlockHeader> = txn.read(EraId::from(era_id))?;
1498 match maybe_block_header {
1499 Some(block_header) => {
1500 let block_signatures = match txn.read(block_header.block_hash())? {
1501 Some(signatures) => signatures,
1502 None => match &block_header {
1503 BlockHeader::V1(header) => BlockSignatures::V1(BlockSignaturesV1::new(
1504 header.block_hash(),
1505 header.era_id(),
1506 )),
1507 BlockHeader::V2(header) => BlockSignatures::V2(BlockSignaturesV2::new(
1508 header.block_hash(),
1509 header.height(),
1510 header.era_id(),
1511 self.chain_name_hash,
1512 )),
1513 },
1514 };
1515 result.push(BlockHeaderWithSignatures::new(
1516 block_header,
1517 block_signatures,
1518 ));
1519 }
1520 None => return Ok(None),
1521 }
1522 }
1523 result.push(highest_block_header_with_signatures.clone());
1524
1525 Ok(Some(result))
1526 }
1527
1528 fn store_finalized_approvals(
1533 &mut self,
1534 transaction_hash: &TransactionHash,
1535 finalized_approvals: &BTreeSet<Approval>,
1536 ) -> Result<bool, FatalStorageError> {
1537 let mut txn = self.block_store.checkout_rw()?;
1538 let original_transaction: Transaction = txn.read(*transaction_hash)?.ok_or({
1539 FatalStorageError::UnexpectedFinalizedApprovals {
1540 transaction_hash: *transaction_hash,
1541 }
1542 })?;
1543
1544 let maybe_existing_finalized_approvals: Option<BTreeSet<Approval>> =
1546 txn.read(*transaction_hash)?;
1547 if maybe_existing_finalized_approvals.as_ref() == Some(finalized_approvals) {
1548 return Ok(false);
1549 }
1550
1551 let original_approvals = original_transaction.approvals();
1552 if &original_approvals != finalized_approvals {
1553 let _ = txn.write(&TransactionFinalizedApprovals {
1554 transaction_hash: *transaction_hash,
1555 finalized_approvals: finalized_approvals.clone(),
1556 })?;
1557 txn.commit()?;
1558 return Ok(true);
1559 }
1560
1561 Ok(false)
1562 }
1563
1564 fn get_transfers(
1572 &mut self,
1573 block_hash: &BlockHash,
1574 ) -> Result<Option<Vec<Transfer>>, FatalStorageError> {
1575 let mut rw_txn = self.block_store.checkout_rw()?;
1576 let maybe_transfers: Option<Vec<Transfer>> = rw_txn.read(*block_hash)?;
1577 if let Some(transfers) = maybe_transfers {
1578 if !transfers.is_empty() {
1579 return Ok(Some(transfers));
1580 }
1581 }
1582
1583 let block: Block = match rw_txn.read(*block_hash)? {
1584 Some(block) => block,
1585 None => return Ok(None),
1586 };
1587
1588 let deploy_hashes: Vec<DeployHash> = match block.clone_body() {
1589 BlockBody::V1(v1) => v1.deploy_and_transfer_hashes().copied().collect(),
1590 BlockBody::V2(v2) => v2
1591 .all_transactions()
1592 .filter_map(|transaction_hash| match transaction_hash {
1593 TransactionHash::Deploy(deploy_hash) => Some(*deploy_hash),
1594 TransactionHash::V1(_) => None,
1595 })
1596 .collect(),
1597 };
1598
1599 let mut transfers: Vec<Transfer> = vec![];
1600 for deploy_hash in deploy_hashes {
1601 let transaction_hash = TransactionHash::Deploy(deploy_hash);
1602 let successful_xfers = match rw_txn.read(transaction_hash)? {
1603 Some(exec_result) => successful_transfers(&exec_result),
1604 None => {
1605 error!(%deploy_hash, %block_hash, "should have exec result");
1606 vec![]
1607 }
1608 };
1609 transfers.extend(successful_xfers);
1610 }
1611 rw_txn.write(&BlockTransfers {
1612 block_hash: *block_hash,
1613 transfers: transfers.clone(),
1614 })?;
1615 rw_txn.commit()?;
1616 Ok(Some(transfers))
1617 }
1618
1619 fn get_legacy_deploy(
1621 &self,
1622 deploy_hash: DeployHash,
1623 ) -> Result<Option<LegacyDeploy>, FatalStorageError> {
1624 let transaction_hash = TransactionHash::from(deploy_hash);
1625 let txn = self.block_store.checkout_ro()?;
1626 let transaction =
1627 match Self::get_transaction_with_finalized_approvals(&txn, &transaction_hash)? {
1628 Some((transaction, maybe_approvals)) => {
1629 if let Some(approvals) = maybe_approvals {
1630 transaction.with_approvals(approvals)
1631 } else {
1632 transaction
1633 }
1634 }
1635 None => return Ok(None),
1636 };
1637
1638 match transaction {
1639 Transaction::Deploy(deploy) => Ok(Some(LegacyDeploy::from(deploy))),
1640 transaction @ Transaction::V1(_) => {
1641 let mismatch = VariantMismatch(Box::new((transaction_hash, transaction)));
1642 error!(%mismatch, "failed getting legacy deploy");
1643 Err(FatalStorageError::from(mismatch))
1644 }
1645 }
1646 }
1647
1648 fn get_transaction_by_id(
1650 &self,
1651 transaction_id: TransactionId,
1652 ) -> Result<Option<Transaction>, FatalStorageError> {
1653 let transaction_hash = transaction_id.transaction_hash();
1654 let txn = self.block_store.checkout_ro()?;
1655
1656 let maybe_transaction: Option<Transaction> = txn.read(transaction_hash)?;
1657 let transaction: Transaction = match maybe_transaction {
1658 None => return Ok(None),
1659 Some(transaction) if transaction.fetch_id() == transaction_id => {
1660 return Ok(Some(transaction));
1661 }
1662 Some(transaction) => transaction,
1663 };
1664
1665 let finalized_approvals = match txn.read(transaction_hash)? {
1666 None => return Ok(None),
1667 Some(approvals) => approvals,
1668 };
1669
1670 match (
1671 transaction_id.approvals_hash(),
1672 finalized_approvals,
1673 transaction,
1674 ) {
1675 (approvals_hash, finalized_approvals, Transaction::Deploy(deploy)) => {
1676 match ApprovalsHash::compute(&finalized_approvals) {
1677 Ok(computed_approvals_hash) if computed_approvals_hash == approvals_hash => {
1678 let deploy = deploy.with_approvals(finalized_approvals);
1679 Ok(Some(Transaction::from(deploy)))
1680 }
1681 Ok(_computed_approvals_hash) => Ok(None),
1682 Err(error) => {
1683 error!(%error, "failed to calculate finalized deploy approvals hash");
1684 Err(FatalStorageError::UnexpectedSerializationFailure(error))
1685 }
1686 }
1687 }
1688 (approvals_hash, finalized_approvals, Transaction::V1(transaction_v1)) => {
1689 match ApprovalsHash::compute(&finalized_approvals) {
1690 Ok(computed_approvals_hash) if computed_approvals_hash == approvals_hash => {
1691 let transaction_v1 = transaction_v1.with_approvals(finalized_approvals);
1692 Ok(Some(Transaction::from(transaction_v1)))
1693 }
1694 Ok(_computed_approvals_hash) => Ok(None),
1695 Err(error) => {
1696 error!(%error, "failed to calculate finalized transaction approvals hash");
1697 Err(FatalStorageError::UnexpectedSerializationFailure(error))
1698 }
1699 }
1700 }
1701 }
1702 }
1703
1704 #[allow(clippy::type_complexity)]
1706 fn get_transaction_with_finalized_approvals(
1707 txn: &(impl DataReader<TransactionHash, Transaction>
1708 + DataReader<TransactionHash, BTreeSet<Approval>>),
1709 transaction_hash: &TransactionHash,
1710 ) -> Result<Option<(Transaction, Option<BTreeSet<Approval>>)>, FatalStorageError> {
1711 let maybe_transaction: Option<Transaction> = txn.read(*transaction_hash)?;
1712 let transaction = match maybe_transaction {
1713 Some(transaction) => transaction,
1714 None => return Ok(None),
1715 };
1716
1717 let maybe_finalized_approvals: Option<BTreeSet<Approval>> = txn.read(*transaction_hash)?;
1718 let ret = (transaction, maybe_finalized_approvals);
1719
1720 Ok(Some(ret))
1721 }
1722
1723 pub(crate) fn get_sync_leap(
1724 &self,
1725 sync_leap_identifier: SyncLeapIdentifier,
1726 ) -> Result<FetchResponse<SyncLeap, SyncLeapIdentifier>, FatalStorageError> {
1727 let block_hash = sync_leap_identifier.block_hash();
1728
1729 let txn = self.block_store.checkout_ro()?;
1730
1731 let only_from_available_block_range = true;
1732 let trusted_block_header = match self.get_single_block_header_restricted(
1733 &txn,
1734 &block_hash,
1735 only_from_available_block_range,
1736 )? {
1737 Some(trusted_block_header) => trusted_block_header,
1738 None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1739 };
1740
1741 let trusted_ancestor_headers =
1742 match self.get_trusted_ancestor_headers(&txn, &trusted_block_header)? {
1743 Some(trusted_ancestor_headers) => trusted_ancestor_headers,
1744 None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1745 };
1746
1747 if sync_leap_identifier.trusted_ancestor_only() {
1749 return Ok(FetchResponse::Fetched(SyncLeap {
1750 trusted_ancestor_only: true,
1751 trusted_block_header,
1752 trusted_ancestor_headers,
1753 block_headers_with_signatures: vec![],
1754 }));
1755 }
1756
1757 let highest_complete_block_header =
1758 match self.get_highest_complete_block_header_with_signatures(&txn)? {
1759 Some(highest_complete_block_header) => highest_complete_block_header,
1760 None => return Ok(FetchResponse::NotFound(sync_leap_identifier)),
1761 };
1762
1763 if highest_complete_block_header
1764 .block_header()
1765 .era_id()
1766 .saturating_sub(trusted_block_header.era_id().into())
1767 > self.recent_era_count.into()
1768 {
1769 return Ok(FetchResponse::NotProvided(sync_leap_identifier));
1770 }
1771
1772 if highest_complete_block_header.block_header().height() == 0 {
1773 return Ok(FetchResponse::Fetched(SyncLeap {
1774 trusted_ancestor_only: false,
1775 trusted_block_header,
1776 trusted_ancestor_headers: vec![],
1777 block_headers_with_signatures: vec![],
1778 }));
1779 }
1780
1781 if let Some(block_headers_with_signatures) = self.get_block_headers_with_signatures(
1784 &txn,
1785 &trusted_block_header,
1786 &highest_complete_block_header,
1787 )? {
1788 return Ok(FetchResponse::Fetched(SyncLeap {
1789 trusted_ancestor_only: false,
1790 trusted_block_header,
1791 trusted_ancestor_headers,
1792 block_headers_with_signatures,
1793 }));
1794 }
1795
1796 Ok(FetchResponse::NotFound(sync_leap_identifier))
1797 }
1798
1799 fn update_pool_and_send<REv, T>(
1807 &mut self,
1808 effect_builder: EffectBuilder<REv>,
1809 sender: NodeId,
1810 serialized_id: &[u8],
1811 fetch_response: FetchResponse<T, T::Id>,
1812 ) -> Result<Effects<Event>, FatalStorageError>
1813 where
1814 REv: From<NetworkRequest<Message>> + Send,
1815 T: FetchItem,
1816 {
1817 let serialized = fetch_response
1818 .to_serialized()
1819 .map_err(FatalStorageError::StoredItemSerializationFailure)?;
1820 let shared: Arc<[u8]> = serialized.into();
1821
1822 if self.enable_mem_deduplication && fetch_response.was_found() {
1823 self.serialized_item_pool
1824 .put(serialized_id.into(), Arc::downgrade(&shared));
1825 }
1826
1827 let message = Message::new_get_response_from_serialized(<T as FetchItem>::TAG, shared);
1828 Ok(effect_builder.send_message(sender, message).ignore())
1829 }
1830
1831 fn should_return_block(
1835 &self,
1836 block_height: u64,
1837 only_from_available_block_range: bool,
1838 ) -> bool {
1839 if only_from_available_block_range {
1840 self.get_available_block_range().contains(block_height)
1841 } else {
1842 true
1843 }
1844 }
1845
1846 pub(crate) fn get_available_block_range(&self) -> AvailableBlockRange {
1847 match self.completed_blocks.highest_sequence() {
1848 Some(&seq) => seq.into(),
1849 None => AvailableBlockRange::RANGE_0_0,
1850 }
1851 }
1852
1853 pub(crate) fn get_highest_orphaned_block_header(&self) -> HighestOrphanedBlockResult {
1854 match self.completed_blocks.highest_sequence() {
1855 None => HighestOrphanedBlockResult::MissingHighestSequence,
1856 Some(seq) => {
1857 let low = seq.low();
1858 let txn = self
1859 .block_store
1860 .checkout_ro()
1861 .expect("Could not start transaction for lmdb");
1862
1863 match txn.read(low) {
1864 Ok(Some(block)) => match block {
1865 Block::V1(_) | Block::V2(_) => {
1866 HighestOrphanedBlockResult::Orphan(block.clone_header())
1867 }
1868 },
1869 Ok(None) | Err(_) => HighestOrphanedBlockResult::MissingHeader(low),
1870 }
1871 }
1872 }
1873 }
1874
1875 pub(crate) fn read_highest_switch_block_headers(
1877 &self,
1878 count: u64,
1879 ) -> Result<Vec<BlockHeader>, FatalStorageError> {
1880 let txn = self.block_store.checkout_ro()?;
1881 if let Some(last_era_header) =
1882 DataReader::<LatestSwitchBlock, BlockHeader>::read(&txn, LatestSwitchBlock)?
1883 {
1884 let mut result = vec![];
1885 let last_era_id = last_era_header.era_id();
1886 result.push(last_era_header);
1887 for era_id in (0..last_era_id.value())
1888 .rev()
1889 .take(count as usize)
1890 .map(EraId::new)
1891 {
1892 match txn.read(era_id)? {
1893 None => break,
1894 Some(header) => result.push(header),
1895 }
1896 }
1897 result.reverse();
1898 debug!(
1899 ?result,
1900 "Storage: read_highest_switch_block_headers count:({})", count
1901 );
1902 Ok(result)
1903 } else {
1904 Ok(vec![])
1905 }
1906 }
1907
1908 fn read_block_execution_results_or_chunk(
1909 &self,
1910 request: &BlockExecutionResultsOrChunkId,
1911 ) -> Result<Option<BlockExecutionResultsOrChunk>, FatalStorageError> {
1912 let txn = self.block_store.checkout_ro()?;
1913
1914 let execution_results = match Self::get_execution_results(&txn, request.block_hash())? {
1915 Some(execution_results) => execution_results
1916 .into_iter()
1917 .map(|(_deploy_hash, execution_result)| execution_result)
1918 .collect(),
1919 None => return Ok(None),
1920 };
1921 Ok(BlockExecutionResultsOrChunk::new(
1922 *request.block_hash(),
1923 request.chunk_index(),
1924 execution_results,
1925 ))
1926 }
1927
1928 fn get_default_block_signatures(&self, block: &Block) -> BlockSignatures {
1929 match block {
1930 Block::V1(block) => BlockSignaturesV1::new(*block.hash(), block.era_id()).into(),
1931 Block::V2(block) => BlockSignaturesV2::new(
1932 *block.hash(),
1933 block.height(),
1934 block.era_id(),
1935 self.chain_name_hash,
1936 )
1937 .into(),
1938 }
1939 }
1940
1941 fn update_chain_height_metrics(&self) {
1942 if let Some(metrics) = self.metrics.as_ref() {
1943 if let Some(sequence) = self.completed_blocks.highest_sequence() {
1944 let highest_available_block: i64 = sequence.high().try_into().unwrap_or(i64::MIN);
1945 let lowest_available_block: i64 = sequence.low().try_into().unwrap_or(i64::MIN);
1946 metrics.chain_height.set(highest_available_block);
1947 metrics.highest_available_block.set(highest_available_block);
1948 metrics.lowest_available_block.set(lowest_available_block);
1949 }
1950 }
1951 }
1952
1953 pub(crate) fn read_block_header_by_hash(
1954 &self,
1955 block_hash: &BlockHash,
1956 ) -> Result<Option<BlockHeader>, FatalStorageError> {
1957 let ro_txn = self.block_store.checkout_ro()?;
1958
1959 ro_txn.read(*block_hash).map_err(FatalStorageError::from)
1960 }
1961
1962 fn get_execution_results(
1963 txn: &(impl DataReader<BlockHash, Block> + DataReader<TransactionHash, ExecutionResult>),
1964 block_hash: &BlockHash,
1965 ) -> Result<Option<Vec<(TransactionHash, ExecutionResult)>>, FatalStorageError> {
1966 let block = txn.read(*block_hash)?;
1967
1968 let block_body = match block {
1969 Some(block) => block.take_body(),
1970 None => return Ok(None),
1971 };
1972
1973 let transaction_hashes: Vec<TransactionHash> = match block_body {
1974 BlockBody::V1(v1) => v1
1975 .deploy_and_transfer_hashes()
1976 .map(TransactionHash::from)
1977 .collect(),
1978 BlockBody::V2(v2) => v2.all_transactions().copied().collect(),
1979 };
1980 let mut execution_results = vec![];
1981 for transaction_hash in transaction_hashes {
1982 match txn.read(transaction_hash)? {
1983 None => {
1984 debug!(
1985 %block_hash,
1986 %transaction_hash,
1987 "retrieved block but execution result for given transaction is absent"
1988 );
1989 return Ok(None);
1990 }
1991 Some(execution_result) => {
1992 execution_results.push((transaction_hash, execution_result));
1993 }
1994 }
1995 }
1996 Ok(Some(execution_results))
1997 }
1998
1999 #[allow(clippy::type_complexity)]
2000 fn get_execution_results_with_transaction_headers(
2001 txn: &(impl DataReader<BlockHash, Block>
2002 + DataReader<TransactionHash, ExecutionResult>
2003 + DataReader<TransactionHash, Transaction>),
2004 block_hash: &BlockHash,
2005 ) -> Result<Option<Vec<(TransactionHash, TransactionHeader, ExecutionResult)>>, FatalStorageError>
2006 {
2007 let execution_results = match Self::get_execution_results(txn, block_hash)? {
2008 Some(execution_results) => execution_results,
2009 None => return Ok(None),
2010 };
2011
2012 let mut ret = Vec::with_capacity(execution_results.len());
2013 for (transaction_hash, execution_result) in execution_results {
2014 match txn.read(transaction_hash)? {
2015 None => {
2016 error!(
2017 %block_hash,
2018 %transaction_hash,
2019 "missing transaction"
2020 );
2021 return Ok(None);
2022 }
2023 Some(Transaction::Deploy(deploy)) => ret.push((
2024 transaction_hash,
2025 deploy.take_header().into(),
2026 execution_result,
2027 )),
2028 Some(Transaction::V1(transaction_v1)) => {
2029 ret.push((transaction_hash, (&transaction_v1).into(), execution_result))
2030 }
2031 };
2032 }
2033 Ok(Some(ret))
2034 }
2035
2036 fn get_block_utilization_score(
2037 &mut self,
2038 era_id: EraId,
2039 block_height: u64,
2040 block_utilization: u64,
2041 ) -> Option<(u64, u64)> {
2042 let ret = match self.utilization_tracker.get_mut(&era_id) {
2043 Some(utilization) => {
2044 utilization.entry(block_height).or_insert(block_utilization);
2045
2046 let transaction_count = utilization.values().sum();
2047 let block_count = utilization.keys().len() as u64;
2048
2049 Some((transaction_count, block_count))
2050 }
2051 None => {
2052 let mut utilization = BTreeMap::new();
2053 utilization.insert(block_height, block_utilization);
2054
2055 self.utilization_tracker.insert(era_id, utilization);
2056
2057 let block_count = 1u64;
2058 Some((block_utilization, block_count))
2059 }
2060 };
2061
2062 self.utilization_tracker
2063 .retain(|key_era_id, _| key_era_id.value() + 2 >= era_id.value());
2064
2065 ret
2066 }
2067}
2068
2069fn decode_item_id<T>(raw: &[u8]) -> Result<T::Id, GetRequestError>
2071where
2072 T: FetchItem,
2073{
2074 bincode::deserialize(raw).map_err(GetRequestError::MalformedIncomingItemId)
2075}
2076
2077fn should_move_storage_files_to_network_subdir(
2078 root: &Path,
2079 file_names: &[&str],
2080) -> Result<bool, FatalStorageError> {
2081 let mut files_found = vec![];
2082 let mut files_not_found = vec![];
2083
2084 for file_name in file_names {
2085 let file_path = root.join(file_name);
2086
2087 if file_path.exists() {
2088 files_found.push(file_path);
2089 } else {
2090 files_not_found.push(file_path);
2091 }
2092 }
2093
2094 let should_move_files = files_found.len() == file_names.len();
2095
2096 if !should_move_files && !files_found.is_empty() {
2097 error!(
2098 "found storage files: {:?}, missing storage files: {:?}",
2099 files_found, files_not_found
2100 );
2101
2102 return Err(FatalStorageError::MissingStorageFiles {
2103 missing_files: files_not_found,
2104 });
2105 }
2106
2107 Ok(should_move_files)
2108}
2109
2110fn move_storage_files_to_network_subdir(
2111 root: &Path,
2112 subdir: &Path,
2113 file_names: &[&str],
2114) -> Result<(), FatalStorageError> {
2115 file_names
2116 .iter()
2117 .map(|file_name| {
2118 let source_path = root.join(file_name);
2119 let dest_path = subdir.join(file_name);
2120 fs::rename(&source_path, &dest_path).map_err(|original_error| {
2121 FatalStorageError::UnableToMoveFile {
2122 source_path,
2123 dest_path,
2124 original_error,
2125 }
2126 })
2127 })
2128 .collect::<Result<Vec<_>, FatalStorageError>>()?;
2129
2130 info!(
2131 "moved files: {:?} from: {:?} to: {:?}",
2132 file_names, root, subdir
2133 );
2134 Ok(())
2135}
2136
2137fn successful_transfers(execution_result: &ExecutionResult) -> Vec<Transfer> {
2140 let mut all_transfers: Vec<Transfer> = vec![];
2141 match execution_result {
2142 ExecutionResult::V1(ExecutionResultV1::Success { effect, .. }) => {
2143 for transform_v1 in &effect.transforms {
2144 if let execution_result_v1::TransformKindV1::WriteTransfer(transfer_v1) =
2145 &transform_v1.transform
2146 {
2147 all_transfers.push(Transfer::V1(transfer_v1.clone()));
2148 }
2149 }
2150 }
2151 ExecutionResult::V2(execution_result_v2) => {
2152 if execution_result_v2.error_message.is_none() {
2153 for transfer in &execution_result_v2.transfers {
2154 all_transfers.push(transfer.clone());
2155 }
2156 }
2157 }
2159 ExecutionResult::V1(ExecutionResultV1::Failure { .. }) => {
2160 }
2162 }
2163 all_transfers
2164}
2165
2166#[cfg(test)]
2169impl Storage {
2170 pub(crate) fn get_transaction_with_finalized_approvals_by_hash(
2176 &self,
2177 transaction_hash: &TransactionHash,
2178 ) -> Option<(Transaction, Option<BTreeSet<Approval>>)> {
2179 let txn = self
2180 .block_store
2181 .checkout_ro()
2182 .expect("could not create RO transaction");
2183 Self::get_transaction_with_finalized_approvals(&txn, transaction_hash)
2184 .expect("could not retrieve a transaction with finalized approvals from storage")
2185 }
2186
2187 pub(crate) fn read_execution_result(
2193 &self,
2194 transaction_hash: &TransactionHash,
2195 ) -> Option<ExecutionResult> {
2196 self.block_store
2197 .checkout_ro()
2198 .expect("could not create RO transaction")
2199 .read(*transaction_hash)
2200 .expect("could not retrieve execution result from storage")
2201 }
2202
2203 pub(crate) fn get_transaction_by_hash(
2209 &self,
2210 transaction_hash: TransactionHash,
2211 ) -> Option<Transaction> {
2212 self.block_store
2213 .checkout_ro()
2214 .expect("could not create RO transaction")
2215 .read(transaction_hash)
2216 .expect("could not retrieve value from storage")
2217 }
2218
2219 pub(crate) fn read_block_by_hash(&self, block_hash: BlockHash) -> Option<Block> {
2220 self.block_store
2221 .checkout_ro()
2222 .expect("could not create RO transaction")
2223 .read(block_hash)
2224 .expect("could not retrieve value from storage")
2225 }
2226
2227 pub(crate) fn read_block_by_height(&self, height: u64) -> Option<Block> {
2228 self.block_store
2229 .checkout_ro()
2230 .expect("could not create RO transaction")
2231 .read(height)
2232 .expect("could not retrieve value from storage")
2233 }
2234
2235 pub(crate) fn read_highest_block(&self) -> Option<Block> {
2236 self.block_store
2237 .checkout_ro()
2238 .expect("could not create RO transaction")
2239 .read(Tip)
2240 .expect("could not retrieve value from storage")
2241 }
2242
2243 pub(crate) fn read_highest_block_header(&self) -> Option<BlockHeader> {
2244 self.block_store
2245 .checkout_ro()
2246 .expect("could not create RO transaction")
2247 .read(Tip)
2248 .expect("could not retrieve value from storage")
2249 }
2250
2251 pub(crate) fn get_finality_signatures_for_block(
2252 &self,
2253 block_hash: BlockHash,
2254 ) -> Option<BlockSignatures> {
2255 let txn = self
2256 .block_store
2257 .checkout_ro()
2258 .expect("could not create RO transaction");
2259 let res: Option<BlockSignatures> = txn
2260 .read(block_hash)
2261 .expect("could not retrieve value from storage");
2262 txn.commit().expect("Could not commit transaction");
2263 res
2264 }
2265
2266 pub(crate) fn read_switch_block_by_era_id(&self, era_id: EraId) -> Option<Block> {
2267 self.block_store
2268 .checkout_ro()
2269 .expect("could not create RO transaction")
2270 .read(era_id)
2271 .expect("could not retrieve value from storage")
2272 }
2273
2274 pub(crate) fn read_block_with_signatures_by_hash(
2275 &self,
2276 block_hash: BlockHash,
2277 only_from_available_block_range: bool,
2278 ) -> Option<BlockWithSignatures> {
2279 let ro_txn = self
2280 .block_store
2281 .checkout_ro()
2282 .expect("should create ro txn");
2283 let block: Block = ro_txn.read(block_hash).expect("should read block")?;
2284
2285 if !(self.should_return_block(block.height(), only_from_available_block_range)) {
2286 return None;
2287 }
2288 if block_hash != *block.hash() {
2289 error!(
2290 queried_block_hash = ?block_hash,
2291 actual_block_hash = ?block.hash(),
2292 "block not stored under hash"
2293 );
2294 debug_assert_eq!(&block_hash, block.hash());
2295 return None;
2296 }
2297 let block_signatures = ro_txn
2298 .read(block_hash)
2299 .expect("should read block signatures")
2300 .unwrap_or_else(|| self.get_default_block_signatures(&block));
2301 if block_signatures.is_verified().is_err() {
2302 error!(?block, "invalid block signatures for block");
2303 debug_assert!(block_signatures.is_verified().is_ok());
2304 return None;
2305 }
2306 Some(BlockWithSignatures::new(block, block_signatures))
2307 }
2308
2309 pub(crate) fn read_block_with_signatures_by_height(
2310 &self,
2311 height: u64,
2312 only_from_available_block_range: bool,
2313 ) -> Option<BlockWithSignatures> {
2314 if !(self.should_return_block(height, only_from_available_block_range)) {
2315 return None;
2316 }
2317 let ro_txn = self
2318 .block_store
2319 .checkout_ro()
2320 .expect("should create ro txn");
2321 let block: Block = ro_txn.read(height).expect("should read block")?;
2322 let hash = block.hash();
2323 let block_signatures = ro_txn
2324 .read(*hash)
2325 .expect("should read block signatures")
2326 .unwrap_or_else(|| self.get_default_block_signatures(&block));
2327 Some(BlockWithSignatures::new(block, block_signatures))
2328 }
2329
2330 pub(crate) fn read_highest_block_with_signatures(
2331 &self,
2332 only_from_available_block_range: bool,
2333 ) -> Option<BlockWithSignatures> {
2334 let ro_txn = self
2335 .block_store
2336 .checkout_ro()
2337 .expect("should create ro txn");
2338 let highest_block = if only_from_available_block_range {
2339 let height = self.highest_complete_block_height()?;
2340 ro_txn.read(height).expect("should read block")?
2341 } else {
2342 DataReader::<Tip, Block>::read(&ro_txn, Tip).expect("should read block")?
2343 };
2344 let hash = highest_block.hash();
2345 let block_signatures = match ro_txn.read(*hash).expect("should read block signatures") {
2346 Some(signatures) => signatures,
2347 None => self.get_default_block_signatures(&highest_block),
2348 };
2349 Some(BlockWithSignatures::new(highest_block, block_signatures))
2350 }
2351
2352 pub(crate) fn read_execution_info(
2353 &self,
2354 transaction_hash: TransactionHash,
2355 ) -> Option<ExecutionInfo> {
2356 let txn = self
2357 .block_store
2358 .checkout_ro()
2359 .expect("should create ro txn");
2360 let block_hash_and_height: BlockHashHeightAndEra = txn
2361 .read(transaction_hash)
2362 .expect("should read block hash and height")?;
2363 let execution_result = txn
2364 .read(transaction_hash)
2365 .expect("should read execution result");
2366 Some(ExecutionInfo {
2367 block_hash: block_hash_and_height.block_hash,
2368 block_height: block_hash_and_height.block_height,
2369 execution_result,
2370 })
2371 }
2372}