fuel_core/service/adapters/consensus_module/
poa.rs

1use crate::{
2    fuel_core_graphql_api::ports::ConsensusModulePort,
3    service::adapters::{
4        BlockImporterAdapter,
5        BlockProducerAdapter,
6        P2PAdapter,
7        PoAAdapter,
8        TxPoolAdapter,
9    },
10};
11use anyhow::anyhow;
12use fuel_core_poa::{
13    ports::{
14        BlockImporter,
15        P2pPort,
16        PredefinedBlocks,
17        TransactionPool,
18        TransactionsSource,
19    },
20    service::{
21        Mode,
22        SharedState,
23    },
24};
25use fuel_core_services::stream::BoxStream;
26use fuel_core_storage::transactional::Changes;
27use fuel_core_types::{
28    blockchain::block::Block,
29    fuel_types::BlockHeight,
30    services::{
31        block_importer::{
32            BlockImportInfo,
33            UncommittedResult as UncommittedImporterResult,
34        },
35        executor::UncommittedResult,
36    },
37    tai64::Tai64,
38};
39use std::path::{
40    Path,
41    PathBuf,
42};
43use tokio::{
44    sync::watch,
45    time::Instant,
46};
47use tokio_stream::{
48    StreamExt,
49    wrappers::BroadcastStream,
50};
51
52pub mod pre_confirmation_signature;
53
54impl PoAAdapter {
55    pub fn new(shared_state: Option<SharedState>) -> Self {
56        Self { shared_state }
57    }
58
59    pub async fn manually_produce_blocks(
60        &self,
61        start_time: Option<Tai64>,
62        mode: Mode,
63    ) -> anyhow::Result<()> {
64        self.shared_state
65            .as_ref()
66            .ok_or(anyhow!("The block production is disabled"))?
67            .manually_produce_block(start_time, mode)
68            .await
69    }
70}
71
72#[async_trait::async_trait]
73impl ConsensusModulePort for PoAAdapter {
74    async fn manually_produce_blocks(
75        &self,
76        start_time: Option<Tai64>,
77        number_of_blocks: u32,
78    ) -> anyhow::Result<()> {
79        self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
80            .await
81    }
82}
83
84impl TransactionPool for TxPoolAdapter {
85    fn new_txs_watcher(&self) -> watch::Receiver<()> {
86        self.service.get_new_executable_txs_notifier()
87    }
88}
89
90#[async_trait::async_trait]
91impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
92    async fn produce_and_execute_block(
93        &self,
94        height: BlockHeight,
95        block_time: Tai64,
96        source: TransactionsSource,
97        deadline: Instant,
98    ) -> anyhow::Result<UncommittedResult<Changes>> {
99        match source {
100            TransactionsSource::TxPool => {
101                self.block_producer
102                    .produce_and_execute_block_txpool(height, block_time, deadline)
103                    .await
104            }
105            TransactionsSource::SpecificTransactions(txs) => {
106                self.block_producer
107                    .produce_and_execute_block_transactions(height, block_time, txs)
108                    .await
109            }
110        }
111    }
112
113    async fn produce_predefined_block(
114        &self,
115        block: &Block,
116    ) -> anyhow::Result<UncommittedResult<Changes>> {
117        self.block_producer
118            .produce_and_execute_predefined(block, ())
119            .await
120    }
121}
122
123#[async_trait::async_trait]
124impl BlockImporter for BlockImporterAdapter {
125    async fn commit_result(
126        &self,
127        result: UncommittedImporterResult<Changes>,
128    ) -> anyhow::Result<()> {
129        self.block_importer
130            .commit_result(result)
131            .await
132            .map_err(Into::into)
133    }
134
135    fn block_stream(&self) -> BoxStream<BlockImportInfo> {
136        Box::pin(
137            BroadcastStream::new(self.block_importer.subscribe())
138                .filter_map(|result| result.ok())
139                .map(|result| BlockImportInfo::from(result.shared_result)),
140        )
141    }
142}
143
144#[cfg(feature = "p2p")]
145impl P2pPort for P2PAdapter {
146    fn reserved_peers_count(&self) -> BoxStream<usize> {
147        match &self.service {
148            Some(service) => Box::pin(
149                BroadcastStream::new(service.subscribe_reserved_peers_count())
150                    .filter_map(|result| result.ok()),
151            ),
152            _ => Box::pin(tokio_stream::pending()),
153        }
154    }
155}
156
157#[cfg(not(feature = "p2p"))]
158impl P2pPort for P2PAdapter {
159    fn reserved_peers_count(&self) -> BoxStream<usize> {
160        Box::pin(tokio_stream::pending())
161    }
162}
163
164pub struct InDirectoryPredefinedBlocks {
165    path_to_directory: Option<PathBuf>,
166}
167
168impl InDirectoryPredefinedBlocks {
169    pub fn new(path_to_directory: Option<PathBuf>) -> Self {
170        Self { path_to_directory }
171    }
172}
173
174impl PredefinedBlocks for InDirectoryPredefinedBlocks {
175    fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
176        let Some(path) = &self.path_to_directory else {
177            return Ok(None);
178        };
179
180        let block_height: u32 = (*height).into();
181        if block_exists(path.as_path(), block_height) {
182            let block_path = block_path(path.as_path(), block_height);
183            let block_bytes = std::fs::read(block_path)?;
184            let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
185            Ok(Some(block))
186        } else {
187            Ok(None)
188        }
189    }
190}
191
192pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
193    path_to_directory.join(format!("{}.json", block_height))
194}
195
196pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
197    block_path(path_to_directory, block_height).exists()
198}