node_builder/
lib.rs

1#![recursion_limit = "256"]
2
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::Write;
6use std::net::SocketAddr;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use ::rand::{Rng, random};
12use anyhow::{Context, Result};
13use miden_lib::AuthScheme;
14use miden_lib::account::faucets::create_basic_fungible_faucet;
15use miden_lib::utils::Serializable;
16use miden_node_block_producer::{
17    BlockProducer,
18    DEFAULT_MAX_BATCHES_PER_BLOCK,
19    DEFAULT_MAX_TXS_PER_BATCH,
20};
21use miden_node_ntx_builder::NetworkTransactionBuilder;
22use miden_node_rpc::Rpc;
23use miden_node_store::{GenesisState, Store};
24use miden_node_utils::crypto::get_rpo_random_coin;
25use miden_objects::account::auth::AuthSecretKey;
26use miden_objects::account::{Account, AccountFile};
27use miden_objects::asset::TokenSymbol;
28use miden_objects::block::FeeParameters;
29use miden_objects::crypto::dsa::rpo_falcon512::SecretKey;
30use miden_objects::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET;
31use miden_objects::{Felt, ONE};
32use rand_chacha::ChaCha20Rng;
33use rand_chacha::rand_core::SeedableRng;
34use tokio::net::TcpListener;
35use tokio::sync::Barrier;
36use tokio::task::{Id, JoinSet};
37use url::Url;
38
39pub const DEFAULT_BLOCK_INTERVAL: u64 = 5_000;
40pub const DEFAULT_BATCH_INTERVAL: u64 = 2_000;
41pub const DEFAULT_RPC_PORT: u16 = 57_291;
42pub const GENESIS_ACCOUNT_FILE: &str = "account.mac";
43const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(10);
44
45/// Builder for configuring and starting a Miden node with all components.
46pub struct NodeBuilder {
47    data_directory: PathBuf,
48    block_interval: Duration,
49    batch_interval: Duration,
50    rpc_port: u16,
51}
52
53impl NodeBuilder {
54    // CONSTRUCTOR
55    // --------------------------------------------------------------------------------------------
56
57    /// Creates a new [`NodeBuilder`] with default settings.
58    pub fn new(data_directory: PathBuf) -> Self {
59        Self {
60            data_directory,
61            block_interval: Duration::from_millis(DEFAULT_BLOCK_INTERVAL),
62            batch_interval: Duration::from_millis(DEFAULT_BATCH_INTERVAL),
63            rpc_port: DEFAULT_RPC_PORT,
64        }
65    }
66
67    /// Sets the block production interval.
68    #[must_use]
69    pub fn with_block_interval(mut self, interval: Duration) -> Self {
70        self.block_interval = interval;
71        self
72    }
73    /// Sets the batch production interval.
74    #[must_use]
75    pub fn with_batch_interval(mut self, interval: Duration) -> Self {
76        self.batch_interval = interval;
77        self
78    }
79
80    /// Sets the RPC port.
81    #[must_use]
82    pub fn with_rpc_port(mut self, port: u16) -> Self {
83        self.rpc_port = port;
84        self
85    }
86    // START
87    // --------------------------------------------------------------------------------------------
88
89    /// Starts all node components and returns a handle to manage them.
90    #[allow(clippy::too_many_lines)]
91    pub async fn start(self) -> Result<NodeHandle> {
92        miden_node_utils::logging::setup_tracing(
93            miden_node_utils::logging::OpenTelemetry::Disabled,
94        )?;
95
96        let account_file =
97            generate_genesis_account().context("failed to create genesis account")?;
98
99        // Write account data to disk (including secrets).
100        //
101        // Without this the accounts would be inaccessible by the user.
102        // This is not used directly by the node, but rather by the owner / operator of the node.
103        let filepath = self.data_directory.join(GENESIS_ACCOUNT_FILE);
104        File::create_new(&filepath)
105            .and_then(|mut file| file.write_all(&account_file.to_bytes()))
106            .with_context(|| {
107                format!("failed to write data for genesis account to file {}", filepath.display())
108            })?;
109
110        let version = 1;
111        let timestamp = SystemTime::now()
112            .duration_since(UNIX_EPOCH)
113            .expect("current timestamp should be greater than unix epoch")
114            .as_secs()
115            .try_into()
116            .expect("timestamp should fit into u32");
117        let genesis_state = GenesisState::new(
118            vec![account_file.account],
119            FeeParameters::new(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET.try_into().unwrap(), 0u32)
120                .unwrap(),
121            version,
122            timestamp,
123        );
124
125        // Bootstrap the store database
126        Store::bootstrap(genesis_state, &self.data_directory)
127            .context("failed to bootstrap store")?;
128
129        // Start listening on all gRPC urls so that inter-component connections can be created
130        // before each component is fully started up.
131        let grpc_rpc = TcpListener::bind(format!("127.0.0.1:{}", self.rpc_port))
132            .await
133            .context("failed to bind to RPC gRPC endpoint")?;
134        let store_rpc_listener = TcpListener::bind("127.0.0.1:0")
135            .await
136            .context("failed to bind to store RPC gRPC endpoint")?;
137        let store_ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
138            .await
139            .context("failed to bind to store ntx-builder gRPC endpoint")?;
140        let store_block_producer_listener = TcpListener::bind("127.0.0.1:0")
141            .await
142            .context("failed to bind to store block-producer gRPC endpoint")?;
143
144        let store_rpc_address = store_rpc_listener
145            .local_addr()
146            .context("failed to retrieve the store's RPC gRPC address")?;
147        let store_block_producer_address = store_block_producer_listener
148            .local_addr()
149            .context("failed to retrieve the store's block-producer gRPC address")?;
150        let store_ntx_builder_address = store_ntx_builder_listener
151            .local_addr()
152            .context("failed to retrieve the store's ntx-builder gRPC address")?;
153
154        let block_producer_address = available_socket_addr()
155            .await
156            .context("failed to bind to block-producer gRPC endpoint")?;
157
158        // Start components
159
160        let mut join_set = JoinSet::new();
161        let (store_id, _) = Self::start_store(
162            self.data_directory.clone(),
163            &mut join_set,
164            store_rpc_listener,
165            store_ntx_builder_listener,
166            store_block_producer_listener,
167        )
168        .context("failed to start store")?;
169
170        let checkpoint = Arc::new(Barrier::new(2));
171
172        let ntx_builder_id = Self::start_ntx_builder(
173            block_producer_address,
174            store_ntx_builder_address,
175            checkpoint.clone(),
176            &mut join_set,
177        );
178
179        let block_producer_id = self.start_block_producer(
180            block_producer_address,
181            store_block_producer_address,
182            checkpoint,
183            &mut join_set,
184        );
185
186        let rpc_id = join_set
187            .spawn(async move {
188                let store_url = Url::parse(&format!("http://{store_rpc_address}"))
189                    .context("Failed to parse URL")?;
190                let block_producer_url = Some(
191                    Url::parse(&format!("http://{block_producer_address}"))
192                        .context("Failed to parse URL")?,
193                );
194
195                Rpc {
196                    listener: grpc_rpc,
197                    store_url,
198                    block_producer_url,
199                    grpc_timeout: DEFAULT_TIMEOUT_DURATION,
200                }
201                .serve()
202                .await
203                .context("failed while serving RPC component")
204            })
205            .id();
206
207        let component_ids = HashMap::from([
208            (store_id, "store"),
209            (block_producer_id, "block-producer"),
210            (rpc_id, "rpc"),
211            (ntx_builder_id, "ntx-builder"),
212        ]);
213
214        // SAFETY: The joinset is definitely not empty.
215        let component_result = join_set.join_next_with_id().await.unwrap();
216
217        // We expect components to run indefinitely, so we treat any return as fatal.
218        //
219        // Map all outcomes to an error, and provide component context.
220        let (id, err) = match component_result {
221            Ok((id, Ok(_))) => (id, Err(anyhow::anyhow!("Component completed unexpectedly"))),
222            Ok((id, Err(err))) => (id, Err(err)),
223            Err(join_err) => (join_err.id(), Err(join_err).context("Joining component task")),
224        };
225        let component = component_ids.get(&id).unwrap_or(&"unknown");
226
227        // We could abort and gracefully shutdown the other components, but since we're crashing the
228        // node there is no point.
229
230        err.context(format!("Component {component} failed"))
231    }
232
233    // Start store and return the tokio task ID plus the store's gRPC address. The store endpoint is
234    // available after loading completes.
235    fn start_store(
236        data_directory: PathBuf,
237        join_set: &mut JoinSet<Result<()>>,
238        rpc_listener: TcpListener,
239        ntx_builder_listener: TcpListener,
240        block_producer_listener: TcpListener,
241    ) -> Result<(Id, SocketAddr)> {
242        let store_address = rpc_listener
243            .local_addr()
244            .context("failed to retrieve the store's gRPC address")?;
245        Ok((
246            join_set
247                .spawn(async move {
248                    Store {
249                        data_directory,
250                        rpc_listener,
251                        block_producer_listener,
252                        ntx_builder_listener,
253                        grpc_timeout: DEFAULT_TIMEOUT_DURATION,
254                    }
255                    .serve()
256                    .await
257                    .context("failed while serving store component")
258                })
259                .id(),
260            store_address,
261        ))
262    }
263
264    /// Start block-producer and return the tokio task ID. The block-producer's endpoint is
265    /// available after loading completes.
266    fn start_block_producer(
267        &self,
268        block_producer_address: SocketAddr,
269        store_address: SocketAddr,
270        checkpoint: Arc<Barrier>,
271        join_set: &mut JoinSet<Result<()>>,
272    ) -> Id {
273        let batch_interval = self.batch_interval;
274        let block_interval = self.block_interval;
275        join_set
276            .spawn(async move {
277                let store_url = Url::parse(&format!("http://{store_address}"))
278                    .context("Failed to parse URL")?;
279                BlockProducer {
280                    block_producer_address,
281                    store_url,
282                    grpc_timeout: DEFAULT_TIMEOUT_DURATION,
283                    batch_prover_url: None,
284                    block_prover_url: None,
285                    batch_interval,
286                    block_interval,
287                    max_txs_per_batch: DEFAULT_MAX_TXS_PER_BATCH,
288                    max_batches_per_block: DEFAULT_MAX_BATCHES_PER_BLOCK,
289                    production_checkpoint: checkpoint,
290                }
291                .serve()
292                .await
293                .context("failed while serving block-producer component")
294            })
295            .id()
296    }
297
298    /// Start ntx-builder and return the tokio task ID.
299    fn start_ntx_builder(
300        block_producer_address: SocketAddr,
301        store_address: SocketAddr,
302        production_checkpoint: Arc<Barrier>,
303        join_set: &mut JoinSet<Result<()>>,
304    ) -> Id {
305        let store_url =
306            Url::parse(&format!("http://{}:{}/", store_address.ip(), store_address.port()))
307                .unwrap();
308        let block_producer_url = Url::parse(&format!(
309            "http://{}:{}/",
310            block_producer_address.ip(),
311            block_producer_address.port()
312        ))
313        .unwrap();
314
315        join_set
316            .spawn(async move {
317                NetworkTransactionBuilder::new(
318                    store_url,
319                    block_producer_url,
320                    None,
321                    Duration::from_millis(200),
322                    production_checkpoint,
323                )
324                .serve_new()
325                .await
326                .context("failed while serving ntx builder component")
327            })
328            .id()
329    }
330}
331
332// NODE HANDLE
333// ================================================================================================
334
335pub struct NodeHandle {
336    pub rpc_url: String,
337    pub rpc_handle: tokio::task::JoinHandle<()>,
338    pub block_producer_handle: tokio::task::JoinHandle<()>,
339    pub store_handle: tokio::task::JoinHandle<()>,
340}
341
342impl NodeHandle {
343    /// Stops all node components.
344    pub async fn stop(self) -> Result<()> {
345        self.rpc_handle.abort();
346        self.block_producer_handle.abort();
347        self.store_handle.abort();
348
349        // Wait for the tasks to complete
350        let _ = self.rpc_handle.await;
351        let _ = self.block_producer_handle.await;
352        let _ = self.store_handle.await;
353
354        Ok(())
355    }
356}
357
358// UTILS
359// ================================================================================================
360
361fn generate_genesis_account() -> anyhow::Result<AccountFile> {
362    let mut rng = ChaCha20Rng::from_seed(random());
363    let secret = SecretKey::with_rng(&mut get_rpo_random_coin(&mut rng));
364
365    let account = create_basic_fungible_faucet(
366        rng.random(),
367        TokenSymbol::try_from("TST").expect("TST should be a valid token symbol"),
368        12,
369        Felt::from(1_000_000u32),
370        miden_objects::account::AccountStorageMode::Public,
371        AuthScheme::RpoFalcon512 { pub_key: secret.public_key().into() },
372    )?;
373
374    // Force the account nonce to 1.
375    //
376    // By convention, a nonce of zero indicates a freshly generated local account that has yet
377    // to be deployed. An account is deployed onchain along within its first transaction which
378    // results in a non-zero nonce onchain.
379    //
380    // The genesis block is special in that accounts are "deployed" without transactions and
381    // therefore we need bump the nonce manually to uphold this invariant.
382    let (id, vault, storage, code, ..) = account.into_parts();
383    let updated_account = Account::new_unchecked(id, vault, storage, code, ONE, None);
384
385    Ok(AccountFile::new(updated_account, vec![AuthSecretKey::RpoFalcon512(secret)]))
386}
387
388async fn available_socket_addr() -> Result<SocketAddr> {
389    let listener = TcpListener::bind("127.0.0.1:0").await.context("failed to bind to endpoint")?;
390    listener.local_addr().context("failed to retrieve the address")
391}