fuel-core-txpool 0.41.10

Transaction pool that manages transactions and their dependencies.
Documentation
use crate::{
    ports::{
        AtomicView,
        BlockImporter as BlockImporterTrait,
        ConsensusParametersProvider,
        GasPriceProvider,
        NotifyP2P,
        P2PRequests,
        P2PSubscriptions,
        TxPoolPersistentStorage,
        WasmChecker,
        WasmValidityError,
    },
    GasPrice,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
    Mappable,
    PredicateStorageRequirements,
    Result as StorageResult,
    StorageInspect,
    StorageRead,
    StorageSize,
};
use fuel_core_types::{
    blockchain::{
        header::ConsensusParametersVersion,
        SealedBlock,
    },
    entities::{
        coins::coin::CompressedCoin,
        relayer::message::Message,
    },
    fuel_tx::{
        BlobId,
        Bytes32,
        ConsensusParameters,
        Contract,
        ContractId,
        Transaction,
        TxId,
        UtxoId,
    },
    fuel_types::Nonce,
    fuel_vm::{
        BlobBytes,
        BlobData,
    },
    services::{
        block_importer::{
            ImportResult,
            SharedImportResult,
        },
        p2p::{
            GossipData,
            GossipsubMessageAcceptance,
            GossipsubMessageInfo,
            PeerId,
        },
    },
};
use std::{
    borrow::Cow,
    collections::HashMap,
    sync::{
        Arc,
        Mutex,
    },
};
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::ReceiverStream;

#[derive(Default)]
pub struct Data {
    pub coins: HashMap<UtxoId, CompressedCoin>,
    pub contracts: HashMap<ContractId, Contract>,
    pub blobs: HashMap<BlobId, BlobBytes>,
    pub messages: HashMap<Nonce, Message>,
}

#[derive(Clone, Default)]
pub struct MockDb {
    pub data: Arc<Mutex<Data>>,
}

impl MockDb {
    pub fn insert_dummy_blob(&self, blob_id: BlobId) {
        self.data
            .lock()
            .unwrap()
            .blobs
            .insert(blob_id, vec![123; 123].into());
    }

    pub fn insert_message(&self, message: Message) {
        self.data
            .lock()
            .unwrap()
            .messages
            .insert(*message.id(), message);
    }
}

impl TxPoolPersistentStorage for MockDb {
    fn utxo(&self, utxo_id: &UtxoId) -> StorageResult<Option<CompressedCoin>> {
        Ok(self.data.lock().unwrap().coins.get(utxo_id).cloned())
    }

    fn contract_exist(&self, contract_id: &ContractId) -> StorageResult<bool> {
        Ok(self
            .data
            .lock()
            .unwrap()
            .contracts
            .contains_key(contract_id))
    }

    fn blob_exist(&self, blob_id: &BlobId) -> StorageResult<bool> {
        Ok(self.data.lock().unwrap().blobs.contains_key(blob_id))
    }

    fn message(&self, id: &Nonce) -> StorageResult<Option<Message>> {
        Ok(self.data.lock().unwrap().messages.get(id).cloned())
    }
}

impl StorageRead<BlobData> for MockDb {
    fn read(
        &self,
        key: &<BlobData as Mappable>::Key,
        offset: usize,
        buf: &mut [u8],
    ) -> Result<Option<usize>, Self::Error> {
        let table = self.data.lock().unwrap();
        let bytes = table.blobs.get(key);

        bytes
            .map(|bytes| {
                let bytes_len = bytes.as_ref().len();
                let start = offset;
                let end = offset.saturating_add(buf.len());

                if end > bytes_len {
                    return Err(());
                }

                let starting_from_offset = &bytes.as_ref()[start..end];
                buf[..].copy_from_slice(starting_from_offset);
                Ok(buf.len())
            })
            .transpose()
    }

    fn read_alloc(
        &self,
        key: &<BlobData as Mappable>::Key,
    ) -> Result<Option<Vec<u8>>, Self::Error> {
        let table = self.data.lock().unwrap();
        let bytes = table.blobs.get(key);
        let bytes = bytes.map(|bytes| bytes.clone().0);
        Ok(bytes)
    }
}

impl StorageInspect<BlobData> for MockDb {
    type Error = ();

    fn get(
        &self,
        key: &<BlobData as Mappable>::Key,
    ) -> Result<Option<Cow<<BlobData as Mappable>::OwnedValue>>, Self::Error> {
        let table = self.data.lock().unwrap();
        let bytes = table.blobs.get(key);
        Ok(bytes.map(|b| Cow::Owned(b.clone())))
    }

    fn contains_key(
        &self,
        key: &<BlobData as Mappable>::Key,
    ) -> Result<bool, Self::Error> {
        Ok(self.data.lock().unwrap().blobs.contains_key(key))
    }
}

impl StorageSize<BlobData> for MockDb {
    fn size_of_value(
        &self,
        key: &<BlobData as Mappable>::Key,
    ) -> Result<Option<usize>, Self::Error> {
        Ok(self
            .data
            .lock()
            .unwrap()
            .blobs
            .get(key)
            .map(|blob| blob.0.len()))
    }
}

impl PredicateStorageRequirements for MockDb {
    fn storage_error_to_string(error: Self::Error) -> String {
        format!("{:?}", error)
    }
}

#[derive(Clone)]
pub struct MockDBProvider(pub MockDb);

impl AtomicView for MockDBProvider {
    type LatestView = MockDb;

    fn latest_view(&self) -> StorageResult<Self::LatestView> {
        Ok(self.0.clone())
    }
}

#[derive(Debug, Clone)]
pub struct MockTxPoolGasPrice {
    pub gas_price: GasPrice,
}

impl MockTxPoolGasPrice {
    pub fn new(gas_price: GasPrice) -> Self {
        Self { gas_price }
    }
}

impl GasPriceProvider for MockTxPoolGasPrice {
    fn next_gas_price(&self) -> GasPrice {
        self.gas_price
    }
}

pub struct MockWasmChecker {
    pub result: Result<(), WasmValidityError>,
}

impl MockWasmChecker {
    pub fn new(result: Result<(), WasmValidityError>) -> Self {
        Self { result }
    }
}

impl WasmChecker for MockWasmChecker {
    fn validate_uploaded_wasm(
        &self,
        _wasm_root: &Bytes32,
    ) -> Result<(), WasmValidityError> {
        self.result
    }
}

mockall::mock! {
    pub ConsensusParametersProvider {}

    impl ConsensusParametersProvider for ConsensusParametersProvider {
        fn latest_consensus_parameters(&self) -> (ConsensusParametersVersion, Arc<ConsensusParameters>);
    }
}

type GossipedTransaction = GossipData<Transaction>;

mockall::mock! {
    pub P2P {}

    impl P2PSubscriptions for P2P {
        type GossipedTransaction = GossipedTransaction;

        fn gossiped_transaction_events(&self) -> BoxStream<GossipedTransaction>;

        fn subscribe_new_peers(&self) -> BoxStream<PeerId>;
    }

    impl NotifyP2P for P2P {
        fn notify_gossip_transaction_validity(
            &self,
            message_info: GossipsubMessageInfo,
            validity: GossipsubMessageAcceptance,
        ) -> anyhow::Result<()>;

        fn broadcast_transaction(&self, transaction: Arc<Transaction>) -> anyhow::Result<()>;
    }

    #[async_trait::async_trait]
    impl P2PRequests for P2P {
        async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>>;

        async fn request_txs(
            &self,
            peer_id: PeerId,
            tx_ids: Vec<TxId>,
        ) -> anyhow::Result<Vec<Option<Transaction>>>;
    }
}

impl MockP2P {
    pub fn new_with_txs(txs: Vec<Transaction>) -> Self {
        let mut p2p = MockP2P::default();
        p2p.expect_gossiped_transaction_events().returning(move || {
            let txs_clone = txs.clone();
            let stream = fuel_core_services::stream::unfold(txs_clone, |mut txs| async {
                let tx = txs.pop();
                if let Some(tx) = tx {
                    Some((GossipData::new(tx, vec![], vec![]), txs))
                } else {
                    core::future::pending().await
                }
            });
            Box::pin(stream)
        });

        p2p
    }
}

mockall::mock! {
    pub Importer {}

    impl BlockImporterTrait for Importer {
        fn block_events(&self) -> BoxStream<SharedImportResult>;
    }
}

impl MockImporter {
    pub fn with_blocks(blocks: Vec<SealedBlock>) -> Self {
        let mut importer = MockImporter::default();
        importer.expect_block_events().returning(move || {
            let blocks = blocks.clone();
            let stream = fuel_core_services::stream::unfold(blocks, |mut blocks| async {
                let block = blocks.pop();
                if let Some(sealed_block) = block {
                    let result: SharedImportResult = Arc::new(
                        ImportResult::new_from_local(sealed_block, vec![], vec![]),
                    );

                    Some((result, blocks))
                } else {
                    core::future::pending().await
                }
            });
            Box::pin(stream)
        });
        importer
    }

    pub fn with_block_provider(block_provider: Receiver<SharedImportResult>) -> Self {
        let mut importer = MockImporter::default();
        importer
            .expect_block_events()
            .return_once(move || Box::pin(ReceiverStream::new(block_provider)));
        importer
    }
}