use crate::fuel_core_graphql_api::{
database::arc_wrapper::ArcWrapper,
ports::{
OffChainDatabase,
OnChainDatabase,
},
};
use fuel_core_services::yield_stream::StreamYieldExt;
use fuel_core_storage::{
iter::{
BoxedIter,
IntoBoxedIter,
IterDirection,
},
not_found,
tables::Transactions,
transactional::AtomicView,
Error as StorageError,
IsNotFound,
Mappable,
PredicateStorageRequirements,
Result as StorageResult,
StorageInspect,
StorageRead,
StorageSize,
};
use fuel_core_types::{
blockchain::{
block::CompressedBlock,
consensus::Consensus,
primitives::{
BlockId,
DaBlockHeight,
},
},
entities::relayer::{
message::{
MerkleProof,
Message,
},
transaction::RelayedTransactionStatus,
},
fuel_tx::{
Address,
AssetId,
Bytes32,
ContractId,
Salt,
Transaction,
TxId,
TxPointer,
UtxoId,
},
fuel_types::{
BlobId,
BlockHeight,
Nonce,
},
fuel_vm::BlobData,
services::{
graphql_api::ContractBalance,
txpool::TransactionStatus,
},
};
use futures::Stream;
use std::{
borrow::Cow,
sync::Arc,
};
mod arc_wrapper;
pub type OnChainView = Arc<dyn OnChainDatabase>;
pub type OffChainView = Arc<dyn OffChainDatabase>;
pub struct ReadDatabase {
batch_size: usize,
genesis_height: BlockHeight,
on_chain: Box<dyn AtomicView<LatestView = OnChainView>>,
off_chain: Box<dyn AtomicView<LatestView = OffChainView>>,
}
impl ReadDatabase {
pub fn new<OnChain, OffChain>(
batch_size: usize,
genesis_height: BlockHeight,
on_chain: OnChain,
off_chain: OffChain,
) -> Self
where
OnChain: AtomicView + 'static,
OffChain: AtomicView + 'static,
OnChain::LatestView: OnChainDatabase,
OffChain::LatestView: OffChainDatabase,
{
Self {
batch_size,
genesis_height,
on_chain: Box::new(ArcWrapper::new(on_chain)),
off_chain: Box::new(ArcWrapper::new(off_chain)),
}
}
pub fn view(&self) -> StorageResult<ReadView> {
Ok(ReadView {
batch_size: self.batch_size,
genesis_height: self.genesis_height,
on_chain: self.on_chain.latest_view()?,
off_chain: self.off_chain.latest_view()?,
})
}
#[cfg(feature = "test-helpers")]
pub fn test_view(&self) -> ReadView {
self.view().expect("The latest view always should exist")
}
}
#[derive(Clone)]
pub struct ReadView {
pub(crate) batch_size: usize,
pub(crate) genesis_height: BlockHeight,
pub(crate) on_chain: OnChainView,
pub(crate) off_chain: OffChainView,
}
impl ReadView {
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> {
let result = self.on_chain.transaction(tx_id);
if result.is_not_found() {
if let Some(tx) = self.off_chain.old_transaction(tx_id)? {
Ok(tx)
} else {
Err(not_found!(Transactions))
}
} else {
result
}
}
pub async fn transactions(
&self,
tx_ids: Vec<TxId>,
) -> Vec<StorageResult<Transaction>> {
let result = tx_ids
.iter()
.map(|tx_id| self.transaction(tx_id))
.collect::<Vec<_>>();
tokio::task::yield_now().await;
result
}
pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
if *height >= self.genesis_height {
self.on_chain.block(height)
} else {
self.off_chain.old_block(height)
}
}
pub fn blocks(
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>> {
if let Some(height) = height {
match (height >= self.genesis_height, direction) {
(true, IterDirection::Forward) => {
self.on_chain.blocks(Some(height), direction)
}
(true, IterDirection::Reverse) => self
.on_chain
.blocks(Some(height), direction)
.chain(self.off_chain.old_blocks(None, direction))
.into_boxed(),
(false, IterDirection::Forward) => self
.off_chain
.old_blocks(Some(height), direction)
.chain(self.on_chain.blocks(None, direction))
.into_boxed(),
(false, IterDirection::Reverse) => {
self.off_chain.old_blocks(Some(height), direction)
}
}
} else {
match direction {
IterDirection::Forward => self
.off_chain
.old_blocks(None, direction)
.chain(self.on_chain.blocks(None, direction))
.into_boxed(),
IterDirection::Reverse => self
.on_chain
.blocks(None, direction)
.chain(self.off_chain.old_blocks(None, direction))
.into_boxed(),
}
}
}
pub fn latest_height(&self) -> StorageResult<BlockHeight> {
self.on_chain.latest_height()
}
pub fn consensus(&self, id: &BlockHeight) -> StorageResult<Consensus> {
if *id >= self.genesis_height {
self.on_chain.consensus(id)
} else {
self.off_chain.old_block_consensus(id)
}
}
}
impl StorageInspect<BlobData> for ReadView {
type Error = StorageError;
fn get(
&self,
key: &<BlobData as Mappable>::Key,
) -> StorageResult<Option<Cow<<BlobData as Mappable>::OwnedValue>>> {
StorageInspect::<BlobData>::get(self.on_chain.as_ref(), key)
}
fn contains_key(&self, key: &<BlobData as Mappable>::Key) -> StorageResult<bool> {
StorageInspect::<BlobData>::contains_key(self.on_chain.as_ref(), key)
}
}
impl StorageSize<BlobData> for ReadView {
fn size_of_value(&self, key: &BlobId) -> Result<Option<usize>, Self::Error> {
StorageSize::<BlobData>::size_of_value(self.on_chain.as_ref(), key)
}
}
impl StorageRead<BlobData> for ReadView {
fn read(&self, key: &BlobId, buf: &mut [u8]) -> Result<Option<usize>, Self::Error> {
StorageRead::<BlobData>::read(self.on_chain.as_ref(), key, buf)
}
fn read_alloc(&self, key: &BlobId) -> Result<Option<Vec<u8>>, Self::Error> {
StorageRead::<BlobData>::read_alloc(self.on_chain.as_ref(), key)
}
}
impl PredicateStorageRequirements for ReadView {
fn storage_error_to_string(error: Self::Error) -> String {
error.to_string()
}
}
impl ReadView {
pub fn all_messages(
&self,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<Message>> + '_ {
futures::stream::iter(self.on_chain.all_messages(start_message_id, direction))
.yield_each(self.batch_size)
}
pub fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool> {
self.on_chain.message_exists(nonce)
}
pub fn relayed_transaction_status(
&self,
id: Bytes32,
) -> StorageResult<Option<RelayedTransactionStatus>> {
self.off_chain.relayed_tx_status(id)
}
pub fn contract_balances(
&self,
contract: ContractId,
start_asset: Option<AssetId>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<ContractBalance>> + '_ {
futures::stream::iter(self.on_chain.contract_balances(
contract,
start_asset,
direction,
))
.yield_each(self.batch_size)
}
pub fn da_height(&self) -> StorageResult<DaBlockHeight> {
self.on_chain.da_height()
}
pub fn block_history_proof(
&self,
message_block_height: &BlockHeight,
commit_block_height: &BlockHeight,
) -> StorageResult<MerkleProof> {
self.on_chain
.block_history_proof(message_block_height, commit_block_height)
}
}
impl ReadView {
pub fn block_height(&self, block_id: &BlockId) -> StorageResult<BlockHeight> {
self.off_chain.block_height(block_id)
}
pub fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult<Vec<u8>> {
self.off_chain.da_compressed_block(height)
}
pub fn tx_status(&self, tx_id: &TxId) -> StorageResult<TransactionStatus> {
self.off_chain.tx_status(tx_id)
}
pub fn owned_coins_ids(
&self,
owner: &Address,
start_coin: Option<UtxoId>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<UtxoId>> + '_ {
let iter = self.off_chain.owned_coins_ids(owner, start_coin, direction);
futures::stream::iter(iter)
}
pub fn owned_message_ids(
&self,
owner: &Address,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<Nonce>> + '_ {
futures::stream::iter(self.off_chain.owned_message_ids(
owner,
start_message_id,
direction,
))
}
pub fn owned_transactions_ids(
&self,
owner: Address,
start: Option<TxPointer>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<(TxPointer, TxId)>> + '_ {
futures::stream::iter(
self.off_chain
.owned_transactions_ids(owner, start, direction),
)
}
pub fn contract_salt(&self, contract_id: &ContractId) -> StorageResult<Salt> {
self.off_chain.contract_salt(contract_id)
}
pub fn relayed_tx_status(
&self,
id: Bytes32,
) -> StorageResult<Option<RelayedTransactionStatus>> {
self.off_chain.relayed_tx_status(id)
}
pub fn message_is_spent(&self, nonce: &Nonce) -> StorageResult<bool> {
self.off_chain.message_is_spent(nonce)
}
}