use crate::{
database::OnChainIterableKeyValueView,
service::adapters::{
BlockImporterAdapter,
ChainStateInfoProvider,
P2PAdapter,
PreconfirmationSender,
StaticGasPrice,
},
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
Result as StorageResult,
StorageAsRef,
tables::{
Coins,
ContractsRawCode,
Messages,
ProcessedTransactions,
},
};
use fuel_core_txpool::ports::{
BlockImporter,
ChainStateInfoProvider as ChainStateInfoProviderTrait,
GasPriceProvider,
TxStatusManager,
};
use fuel_core_types::{
blockchain::header::ConsensusParametersVersion,
entities::{
coins::coin::CompressedCoin,
relayer::message::Message,
},
fuel_tx::{
BlobId,
ConsensusParameters,
Transaction,
TxId,
UtxoId,
},
fuel_types::{
ContractId,
Nonce,
},
fuel_vm::BlobData,
services::{
block_importer::SharedImportResult,
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
PeerId,
TransactionGossipData,
},
preconfirmation::{
Preconfirmation,
PreconfirmationStatus,
SqueezedOut,
},
transaction_status::{
PreConfirmationStatus,
TransactionStatus,
statuses,
},
},
};
use std::sync::Arc;
use tokio::sync::broadcast;
impl BlockImporter for BlockImporterAdapter {
fn block_events(&self) -> BoxStream<SharedImportResult> {
self.events_shared_result()
}
}
#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
fn broadcast_transaction(&self, transaction: Arc<Transaction>) -> anyhow::Result<()> {
match &self.service {
Some(service) => service.broadcast_transaction(transaction),
_ => Ok(()),
}
}
fn notify_gossip_transaction_validity(
&self,
message_info: GossipsubMessageInfo,
validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()> {
match &self.service {
Some(service) => {
service.notify_gossip_transaction_validity(message_info, validity)
}
_ => Ok(()),
}
}
}
#[cfg(feature = "p2p")]
impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
type GossipedTransaction = TransactionGossipData;
fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
use tokio_stream::{
StreamExt,
wrappers::BroadcastStream,
};
match &self.service {
Some(service) => Box::pin(
BroadcastStream::new(service.subscribe_tx())
.filter_map(|result| result.ok()),
),
_ => fuel_core_services::stream::IntoBoxStream::into_boxed(
tokio_stream::pending(),
),
}
}
fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
use tokio_stream::{
StreamExt,
wrappers::BroadcastStream,
};
match &self.service {
Some(service) => Box::pin(
BroadcastStream::new(service.subscribe_new_peers())
.filter_map(|result| result.ok()),
),
_ => Box::pin(fuel_core_services::stream::pending()),
}
}
}
#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
match &self.service {
Some(service) => service.get_all_transactions_ids_from_peer(peer_id).await,
_ => Ok(vec![]),
}
}
async fn request_txs(
&self,
peer_id: PeerId,
tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
match &self.service {
Some(service) => {
service
.get_full_transactions_from_peer(peer_id, tx_ids)
.await
}
_ => Ok(vec![]),
}
}
}
#[cfg(not(feature = "p2p"))]
const _: () = {
#[async_trait::async_trait]
impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
fn broadcast_transaction(
&self,
_transaction: Arc<Transaction>,
) -> anyhow::Result<()> {
Ok(())
}
fn notify_gossip_transaction_validity(
&self,
_message_info: GossipsubMessageInfo,
_validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()> {
Ok(())
}
}
impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
type GossipedTransaction = TransactionGossipData;
fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
Box::pin(fuel_core_services::stream::pending())
}
fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
Box::pin(fuel_core_services::stream::pending())
}
}
#[async_trait::async_trait]
impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
Ok(vec![])
}
async fn request_txs(
&self,
_peer_id: PeerId,
_tx_ids: Vec<TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
Ok(vec![])
}
}
};
impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValueView {
fn contains_tx(&self, tx_id: &TxId) -> StorageResult<bool> {
self.storage::<ProcessedTransactions>().contains_key(tx_id)
}
fn utxo(&self, utxo_id: &UtxoId) -> StorageResult<Option<CompressedCoin>> {
self.storage::<Coins>()
.get(utxo_id)
.map(|t| t.map(|t| t.into_owned()))
}
fn contract_exist(&self, contract_id: &ContractId) -> StorageResult<bool> {
self.storage::<ContractsRawCode>().contains_key(contract_id)
}
fn blob_exist(&self, blob_id: &BlobId) -> StorageResult<bool> {
self.storage::<BlobData>().contains_key(blob_id)
}
fn message(&self, id: &Nonce) -> StorageResult<Option<Message>> {
self.storage::<Messages>()
.get(id)
.map(|t| t.map(|t| t.into_owned()))
}
}
#[async_trait::async_trait]
impl GasPriceProvider for StaticGasPrice {
fn next_gas_price(&self) -> u64 {
self.gas_price
}
}
impl ChainStateInfoProviderTrait for ChainStateInfoProvider {
fn latest_consensus_parameters(
&self,
) -> (ConsensusParametersVersion, Arc<ConsensusParameters>) {
self.shared_state.latest_consensus_parameters_with_version()
}
}
impl TxStatusManager for PreconfirmationSender {
fn status_update(&self, tx_id: TxId, tx_status: TransactionStatus) {
let permit = self.sender_signature_service.try_reserve();
if let Ok(permit) = permit
&& let TransactionStatus::SqueezedOut(status) = &tx_status
{
let preconfirmation = Preconfirmation {
tx_id,
status: PreconfirmationStatus::SqueezedOut(SqueezedOut::new(
status.reason().to_string(),
tx_id,
)),
};
permit.send(vec![preconfirmation]);
}
self.tx_status_manager_adapter
.update_status(tx_id, tx_status);
}
fn preconfirmations_update_listener(
&self,
) -> broadcast::Receiver<(TxId, PreConfirmationStatus)> {
self.tx_status_manager_adapter
.preconfirmations_update_listener()
}
fn squeezed_out_txs(&self, statuses: Vec<(TxId, statuses::SqueezedOut)>) {
let permit = self.sender_signature_service.try_reserve();
if let Ok(permit) = permit {
let preconfirmations = statuses
.iter()
.map(|(tx_id, status)| Preconfirmation {
tx_id: *tx_id,
status: PreconfirmationStatus::SqueezedOut(SqueezedOut::new(
status.reason().to_string(),
*tx_id,
)),
})
.collect();
permit.send(preconfirmations);
}
self.tx_status_manager_adapter.update_statuses(statuses);
}
}