fuel_core/service/adapters/consensus_module/
poa.rs1use crate::{
2 fuel_core_graphql_api::ports::ConsensusModulePort,
3 service::adapters::{
4 BlockImporterAdapter,
5 BlockProducerAdapter,
6 P2PAdapter,
7 PoAAdapter,
8 TxPoolAdapter,
9 },
10};
11use anyhow::anyhow;
12use fuel_core_poa::{
13 ports::{
14 BlockImporter,
15 P2pPort,
16 PredefinedBlocks,
17 TransactionPool,
18 TransactionsSource,
19 },
20 service::{
21 Mode,
22 SharedState,
23 },
24};
25use fuel_core_services::stream::BoxStream;
26use fuel_core_storage::transactional::Changes;
27use fuel_core_types::{
28 blockchain::block::Block,
29 fuel_types::BlockHeight,
30 services::{
31 block_importer::{
32 BlockImportInfo,
33 UncommittedResult as UncommittedImporterResult,
34 },
35 executor::UncommittedResult,
36 },
37 tai64::Tai64,
38};
39use std::path::{
40 Path,
41 PathBuf,
42};
43use tokio::{
44 sync::watch,
45 time::Instant,
46};
47use tokio_stream::{
48 StreamExt,
49 wrappers::BroadcastStream,
50};
51
52pub mod pre_confirmation_signature;
53
54impl PoAAdapter {
55 pub fn new(shared_state: Option<SharedState>) -> Self {
56 Self { shared_state }
57 }
58
59 pub async fn manually_produce_blocks(
60 &self,
61 start_time: Option<Tai64>,
62 mode: Mode,
63 ) -> anyhow::Result<()> {
64 self.shared_state
65 .as_ref()
66 .ok_or(anyhow!("The block production is disabled"))?
67 .manually_produce_block(start_time, mode)
68 .await
69 }
70}
71
72#[async_trait::async_trait]
73impl ConsensusModulePort for PoAAdapter {
74 async fn manually_produce_blocks(
75 &self,
76 start_time: Option<Tai64>,
77 number_of_blocks: u32,
78 ) -> anyhow::Result<()> {
79 self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
80 .await
81 }
82}
83
84impl TransactionPool for TxPoolAdapter {
85 fn new_txs_watcher(&self) -> watch::Receiver<()> {
86 self.service.get_new_executable_txs_notifier()
87 }
88}
89
90#[async_trait::async_trait]
91impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
92 async fn produce_and_execute_block(
93 &self,
94 height: BlockHeight,
95 block_time: Tai64,
96 source: TransactionsSource,
97 deadline: Instant,
98 ) -> anyhow::Result<UncommittedResult<Changes>> {
99 match source {
100 TransactionsSource::TxPool => {
101 self.block_producer
102 .produce_and_execute_block_txpool(height, block_time, deadline)
103 .await
104 }
105 TransactionsSource::SpecificTransactions(txs) => {
106 self.block_producer
107 .produce_and_execute_block_transactions(height, block_time, txs)
108 .await
109 }
110 }
111 }
112
113 async fn produce_predefined_block(
114 &self,
115 block: &Block,
116 ) -> anyhow::Result<UncommittedResult<Changes>> {
117 self.block_producer
118 .produce_and_execute_predefined(block, ())
119 .await
120 }
121}
122
123#[async_trait::async_trait]
124impl BlockImporter for BlockImporterAdapter {
125 async fn commit_result(
126 &self,
127 result: UncommittedImporterResult<Changes>,
128 ) -> anyhow::Result<()> {
129 self.block_importer
130 .commit_result(result)
131 .await
132 .map_err(Into::into)
133 }
134
135 fn block_stream(&self) -> BoxStream<BlockImportInfo> {
136 Box::pin(
137 BroadcastStream::new(self.block_importer.subscribe())
138 .filter_map(|result| result.ok())
139 .map(|result| BlockImportInfo::from(result.shared_result)),
140 )
141 }
142}
143
144#[cfg(feature = "p2p")]
145impl P2pPort for P2PAdapter {
146 fn reserved_peers_count(&self) -> BoxStream<usize> {
147 match &self.service {
148 Some(service) => Box::pin(
149 BroadcastStream::new(service.subscribe_reserved_peers_count())
150 .filter_map(|result| result.ok()),
151 ),
152 _ => Box::pin(tokio_stream::pending()),
153 }
154 }
155}
156
157#[cfg(not(feature = "p2p"))]
158impl P2pPort for P2PAdapter {
159 fn reserved_peers_count(&self) -> BoxStream<usize> {
160 Box::pin(tokio_stream::pending())
161 }
162}
163
164pub struct InDirectoryPredefinedBlocks {
165 path_to_directory: Option<PathBuf>,
166}
167
168impl InDirectoryPredefinedBlocks {
169 pub fn new(path_to_directory: Option<PathBuf>) -> Self {
170 Self { path_to_directory }
171 }
172}
173
174impl PredefinedBlocks for InDirectoryPredefinedBlocks {
175 fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
176 let Some(path) = &self.path_to_directory else {
177 return Ok(None);
178 };
179
180 let block_height: u32 = (*height).into();
181 if block_exists(path.as_path(), block_height) {
182 let block_path = block_path(path.as_path(), block_height);
183 let block_bytes = std::fs::read(block_path)?;
184 let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
185 Ok(Some(block))
186 } else {
187 Ok(None)
188 }
189 }
190}
191
192pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
193 path_to_directory.join(format!("{}.json", block_height))
194}
195
196pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
197 block_path(path_to_directory, block_height).exists()
198}