1#![recursion_limit = "256"]
2
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::Write;
6use std::net::SocketAddr;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use ::rand::{Rng, random};
12use anyhow::{Context, Result};
13use miden_lib::AuthScheme;
14use miden_lib::account::faucets::create_basic_fungible_faucet;
15use miden_lib::utils::Serializable;
16use miden_node_block_producer::{
17 BlockProducer,
18 DEFAULT_MAX_BATCHES_PER_BLOCK,
19 DEFAULT_MAX_TXS_PER_BATCH,
20};
21use miden_node_ntx_builder::NetworkTransactionBuilder;
22use miden_node_rpc::Rpc;
23use miden_node_store::{GenesisState, Store};
24use miden_node_utils::crypto::get_rpo_random_coin;
25use miden_objects::account::auth::AuthSecretKey;
26use miden_objects::account::{Account, AccountFile};
27use miden_objects::asset::TokenSymbol;
28use miden_objects::block::FeeParameters;
29use miden_objects::crypto::dsa::rpo_falcon512::SecretKey;
30use miden_objects::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET;
31use miden_objects::{Felt, ONE};
32use rand_chacha::ChaCha20Rng;
33use rand_chacha::rand_core::SeedableRng;
34use tokio::net::TcpListener;
35use tokio::sync::Barrier;
36use tokio::task::{Id, JoinSet};
37use url::Url;
38
39pub const DEFAULT_BLOCK_INTERVAL: u64 = 5_000;
40pub const DEFAULT_BATCH_INTERVAL: u64 = 2_000;
41pub const DEFAULT_RPC_PORT: u16 = 57_291;
42pub const GENESIS_ACCOUNT_FILE: &str = "account.mac";
43const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(10);
44
45pub struct NodeBuilder {
47 data_directory: PathBuf,
48 block_interval: Duration,
49 batch_interval: Duration,
50 rpc_port: u16,
51}
52
53impl NodeBuilder {
54 pub fn new(data_directory: PathBuf) -> Self {
59 Self {
60 data_directory,
61 block_interval: Duration::from_millis(DEFAULT_BLOCK_INTERVAL),
62 batch_interval: Duration::from_millis(DEFAULT_BATCH_INTERVAL),
63 rpc_port: DEFAULT_RPC_PORT,
64 }
65 }
66
67 #[must_use]
69 pub fn with_block_interval(mut self, interval: Duration) -> Self {
70 self.block_interval = interval;
71 self
72 }
73 #[must_use]
75 pub fn with_batch_interval(mut self, interval: Duration) -> Self {
76 self.batch_interval = interval;
77 self
78 }
79
80 #[must_use]
82 pub fn with_rpc_port(mut self, port: u16) -> Self {
83 self.rpc_port = port;
84 self
85 }
86 #[allow(clippy::too_many_lines)]
91 pub async fn start(self) -> Result<NodeHandle> {
92 miden_node_utils::logging::setup_tracing(
93 miden_node_utils::logging::OpenTelemetry::Disabled,
94 )?;
95
96 let account_file =
97 generate_genesis_account().context("failed to create genesis account")?;
98
99 let filepath = self.data_directory.join(GENESIS_ACCOUNT_FILE);
104 File::create_new(&filepath)
105 .and_then(|mut file| file.write_all(&account_file.to_bytes()))
106 .with_context(|| {
107 format!("failed to write data for genesis account to file {}", filepath.display())
108 })?;
109
110 let version = 1;
111 let timestamp = SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .expect("current timestamp should be greater than unix epoch")
114 .as_secs()
115 .try_into()
116 .expect("timestamp should fit into u32");
117 let genesis_state = GenesisState::new(
118 vec![account_file.account],
119 FeeParameters::new(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET.try_into().unwrap(), 0u32)
120 .unwrap(),
121 version,
122 timestamp,
123 );
124
125 Store::bootstrap(genesis_state, &self.data_directory)
127 .context("failed to bootstrap store")?;
128
129 let grpc_rpc = TcpListener::bind(format!("127.0.0.1:{}", self.rpc_port))
132 .await
133 .context("failed to bind to RPC gRPC endpoint")?;
134 let store_rpc_listener = TcpListener::bind("127.0.0.1:0")
135 .await
136 .context("failed to bind to store RPC gRPC endpoint")?;
137 let store_ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
138 .await
139 .context("failed to bind to store ntx-builder gRPC endpoint")?;
140 let store_block_producer_listener = TcpListener::bind("127.0.0.1:0")
141 .await
142 .context("failed to bind to store block-producer gRPC endpoint")?;
143
144 let store_rpc_address = store_rpc_listener
145 .local_addr()
146 .context("failed to retrieve the store's RPC gRPC address")?;
147 let store_block_producer_address = store_block_producer_listener
148 .local_addr()
149 .context("failed to retrieve the store's block-producer gRPC address")?;
150 let store_ntx_builder_address = store_ntx_builder_listener
151 .local_addr()
152 .context("failed to retrieve the store's ntx-builder gRPC address")?;
153
154 let block_producer_address = available_socket_addr()
155 .await
156 .context("failed to bind to block-producer gRPC endpoint")?;
157
158 let mut join_set = JoinSet::new();
161 let (store_id, _) = Self::start_store(
162 self.data_directory.clone(),
163 &mut join_set,
164 store_rpc_listener,
165 store_ntx_builder_listener,
166 store_block_producer_listener,
167 )
168 .context("failed to start store")?;
169
170 let checkpoint = Arc::new(Barrier::new(2));
171
172 let ntx_builder_id = Self::start_ntx_builder(
173 block_producer_address,
174 store_ntx_builder_address,
175 checkpoint.clone(),
176 &mut join_set,
177 );
178
179 let block_producer_id = self.start_block_producer(
180 block_producer_address,
181 store_block_producer_address,
182 checkpoint,
183 &mut join_set,
184 );
185
186 let rpc_id = join_set
187 .spawn(async move {
188 let store_url = Url::parse(&format!("http://{store_rpc_address}"))
189 .context("Failed to parse URL")?;
190 let block_producer_url = Some(
191 Url::parse(&format!("http://{block_producer_address}"))
192 .context("Failed to parse URL")?,
193 );
194
195 Rpc {
196 listener: grpc_rpc,
197 store_url,
198 block_producer_url,
199 grpc_timeout: DEFAULT_TIMEOUT_DURATION,
200 }
201 .serve()
202 .await
203 .context("failed while serving RPC component")
204 })
205 .id();
206
207 let component_ids = HashMap::from([
208 (store_id, "store"),
209 (block_producer_id, "block-producer"),
210 (rpc_id, "rpc"),
211 (ntx_builder_id, "ntx-builder"),
212 ]);
213
214 let component_result = join_set.join_next_with_id().await.unwrap();
216
217 let (id, err) = match component_result {
221 Ok((id, Ok(_))) => (id, Err(anyhow::anyhow!("Component completed unexpectedly"))),
222 Ok((id, Err(err))) => (id, Err(err)),
223 Err(join_err) => (join_err.id(), Err(join_err).context("Joining component task")),
224 };
225 let component = component_ids.get(&id).unwrap_or(&"unknown");
226
227 err.context(format!("Component {component} failed"))
231 }
232
233 fn start_store(
236 data_directory: PathBuf,
237 join_set: &mut JoinSet<Result<()>>,
238 rpc_listener: TcpListener,
239 ntx_builder_listener: TcpListener,
240 block_producer_listener: TcpListener,
241 ) -> Result<(Id, SocketAddr)> {
242 let store_address = rpc_listener
243 .local_addr()
244 .context("failed to retrieve the store's gRPC address")?;
245 Ok((
246 join_set
247 .spawn(async move {
248 Store {
249 data_directory,
250 rpc_listener,
251 block_producer_listener,
252 ntx_builder_listener,
253 grpc_timeout: DEFAULT_TIMEOUT_DURATION,
254 }
255 .serve()
256 .await
257 .context("failed while serving store component")
258 })
259 .id(),
260 store_address,
261 ))
262 }
263
264 fn start_block_producer(
267 &self,
268 block_producer_address: SocketAddr,
269 store_address: SocketAddr,
270 checkpoint: Arc<Barrier>,
271 join_set: &mut JoinSet<Result<()>>,
272 ) -> Id {
273 let batch_interval = self.batch_interval;
274 let block_interval = self.block_interval;
275 join_set
276 .spawn(async move {
277 let store_url = Url::parse(&format!("http://{store_address}"))
278 .context("Failed to parse URL")?;
279 BlockProducer {
280 block_producer_address,
281 store_url,
282 grpc_timeout: DEFAULT_TIMEOUT_DURATION,
283 batch_prover_url: None,
284 block_prover_url: None,
285 batch_interval,
286 block_interval,
287 max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH,
288 max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK,
289 production_checkpoint: checkpoint,
290 }
291 .serve()
292 .await
293 .context("failed while serving block-producer component")
294 })
295 .id()
296 }
297
298 fn start_ntx_builder(
300 block_producer_address: SocketAddr,
301 store_address: SocketAddr,
302 production_checkpoint: Arc<Barrier>,
303 join_set: &mut JoinSet<Result<()>>,
304 ) -> Id {
305 let store_url =
306 Url::parse(&format!("http://{}:{}/", store_address.ip(), store_address.port()))
307 .unwrap();
308 let block_producer_url = Url::parse(&format!(
309 "http://{}:{}/",
310 block_producer_address.ip(),
311 block_producer_address.port()
312 ))
313 .unwrap();
314
315 join_set
316 .spawn(async move {
317 NetworkTransactionBuilder::new(
318 store_url,
319 block_producer_url,
320 None,
321 Duration::from_millis(200),
322 production_checkpoint,
323 )
324 .serve_new()
325 .await
326 .context("failed while serving ntx builder component")
327 })
328 .id()
329 }
330}
331
332pub struct NodeHandle {
336 pub rpc_url: String,
337 pub rpc_handle: tokio::task::JoinHandle<()>,
338 pub block_producer_handle: tokio::task::JoinHandle<()>,
339 pub store_handle: tokio::task::JoinHandle<()>,
340}
341
342impl NodeHandle {
343 pub async fn stop(self) -> Result<()> {
345 self.rpc_handle.abort();
346 self.block_producer_handle.abort();
347 self.store_handle.abort();
348
349 let _ = self.rpc_handle.await;
351 let _ = self.block_producer_handle.await;
352 let _ = self.store_handle.await;
353
354 Ok(())
355 }
356}
357
358fn generate_genesis_account() -> anyhow::Result<AccountFile> {
362 let mut rng = ChaCha20Rng::from_seed(random());
363 let secret = SecretKey::with_rng(&mut get_rpo_random_coin(&mut rng));
364
365 let account = create_basic_fungible_faucet(
366 rng.random(),
367 TokenSymbol::try_from("TST").expect("TST should be a valid token symbol"),
368 12,
369 Felt::from(1_000_000u32),
370 miden_objects::account::AccountStorageMode::Public,
371 AuthScheme::RpoFalcon512 { pub_key: secret.public_key().into() },
372 )?;
373
374 let (id, vault, storage, code, ..) = account.into_parts();
383 let updated_account = Account::new_unchecked(id, vault, storage, code, ONE, None);
384
385 Ok(AccountFile::new(updated_account, vec![AuthSecretKey::RpoFalcon512(secret)]))
386}
387
388async fn available_socket_addr() -> Result<SocketAddr> {
389 let listener = TcpListener::bind("127.0.0.1:0").await.context("failed to bind to endpoint")?;
390 listener.local_addr().context("failed to retrieve the address")
391}