1use std::num::{NonZeroU16, NonZeroUsize};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use builder::BlockStream;
8use chain_state::SharedChainState;
9use clients::RpcClient;
10use db::Db;
11use miden_node_utils::ErrorReport;
12use miden_node_utils::lru_cache::LruCache;
13use miden_protocol::block::{BlockNumber, SignedBlock};
14use miden_remote_prover_client::RemoteTransactionProver;
15use tokio::sync::mpsc;
16use tonic::metadata::AsciiMetadataValue;
17use url::Url;
18
19use crate::actor::{AccountActorContext, ActorConfig, GrpcClients, State};
20use crate::coordinator::Coordinator;
21
22pub(crate) type NoteError = Arc<dyn ErrorReport + Send + Sync>;
23
24mod actor;
25mod builder;
26mod chain_state;
27mod clients;
28mod committed_block;
29mod coordinator;
30pub(crate) mod db;
31pub mod server;
32
33#[cfg(test)]
34pub(crate) mod test_utils;
35
36pub use builder::NetworkTransactionBuilder;
37
38pub async fn bootstrap(database_filepath: PathBuf, genesis: &SignedBlock) -> anyhow::Result<()> {
50 validate_genesis_block(genesis).context("genesis block validation failed")?;
51 db::Db::bootstrap(database_filepath, genesis).await
52}
53
54pub fn migrate(database_filepath: impl AsRef<Path>) -> anyhow::Result<()> {
56 db::Db::migrate(database_filepath).context("failed to apply ntx-builder database migrations")
57}
58
59fn validate_genesis_block(block: &SignedBlock) -> anyhow::Result<()> {
60 anyhow::ensure!(
61 block.header().block_num() == BlockNumber::GENESIS,
62 "expected genesis block number (0), got {}",
63 block.header().block_num(),
64 );
65
66 anyhow::ensure!(
67 block
68 .signature()
69 .verify(block.header().commitment(), block.header().validator_key()),
70 "genesis block signature verification failed",
71 );
72
73 Ok(())
74}
75
76#[cfg(test)]
77mod bootstrap_tests {
78 use super::*;
79
80 #[test]
81 fn validate_genesis_block_rejects_invalid_signature() {
82 let block = crate::test_utils::mock_genesis_block();
83 let err = validate_genesis_block(&block).expect_err("invalid signature should fail");
84
85 assert!(
86 err.to_string().contains("signature verification failed"),
87 "unexpected error: {err}",
88 );
89 }
90}
91
92const COMPONENT: &str = "miden-ntx-builder";
96
97const DEFAULT_MAX_NOTES_PER_TX: NonZeroUsize = NonZeroUsize::new(20).expect("literal is non-zero");
99const _: () = assert!(DEFAULT_MAX_NOTES_PER_TX.get() <= miden_tx::MAX_NUM_CHECKER_NOTES);
100
101const DEFAULT_MAX_CONCURRENT_TXS: usize = 4;
106
107const DEFAULT_MAX_BLOCK_COUNT: usize = 4;
109
110const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000;
112
113const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30;
115
116const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize =
118 NonZeroUsize::new(1_000).expect("literal is non-zero");
119
120const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60);
122
123const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10;
125
126const DEFAULT_REQUEST_BACKOFF_INITIAL: Duration = Duration::from_millis(100);
130
131const DEFAULT_REQUEST_BACKOFF_MAX: Duration = Duration::from_secs(30);
133
134const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19;
139
140const DEFAULT_TX_EXPIRATION_DELTA: NonZeroU16 = NonZeroU16::new(30).unwrap();
145
146#[derive(Debug, Clone)]
153pub struct NtxBuilderConfig {
154 pub rpc_url: Url,
156
157 pub rpc_auth_header: Option<AsciiMetadataValue>,
159
160 pub tx_prover_url: Url,
162
163 pub script_cache_size: NonZeroUsize,
166
167 pub max_concurrent_txs: usize,
170
171 pub max_notes_per_tx: NonZeroUsize,
173
174 pub max_note_attempts: usize,
177
178 pub max_block_count: usize,
180
181 pub account_channel_capacity: usize,
183
184 pub idle_timeout: Duration,
189
190 pub max_account_crashes: usize,
194
195 pub max_cycles: u32,
200
201 pub tx_expiration_delta: NonZeroU16,
205
206 pub request_backoff_initial: Duration,
211
212 pub request_backoff_max: Duration,
214
215 pub database_filepath: PathBuf,
217
218 pub sqlite_connection_pool_size: NonZeroUsize,
220}
221
222impl NtxBuilderConfig {
223 pub fn new(rpc_url: Url, tx_prover_url: Url, database_filepath: PathBuf) -> Self {
224 Self {
225 rpc_url,
226 rpc_auth_header: None,
227 tx_prover_url,
228 script_cache_size: DEFAULT_SCRIPT_CACHE_SIZE,
229 max_concurrent_txs: DEFAULT_MAX_CONCURRENT_TXS,
230 max_notes_per_tx: DEFAULT_MAX_NOTES_PER_TX,
231 max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS,
232 max_block_count: DEFAULT_MAX_BLOCK_COUNT,
233 account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY,
234 idle_timeout: DEFAULT_IDLE_TIMEOUT,
235 max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES,
236 max_cycles: DEFAULT_MAX_TX_CYCLES,
237 tx_expiration_delta: DEFAULT_TX_EXPIRATION_DELTA,
238 request_backoff_initial: DEFAULT_REQUEST_BACKOFF_INITIAL,
239 request_backoff_max: DEFAULT_REQUEST_BACKOFF_MAX,
240 database_filepath,
241 sqlite_connection_pool_size: miden_node_db::default_connection_pool_size(),
242 }
243 }
244
245 #[must_use]
247 pub fn with_rpc_auth_header(mut self, value: AsciiMetadataValue) -> Self {
248 self.rpc_auth_header = Some(value);
249 self
250 }
251
252 #[must_use]
254 pub fn with_script_cache_size(mut self, size: NonZeroUsize) -> Self {
255 self.script_cache_size = size;
256 self
257 }
258
259 #[must_use]
261 pub fn with_max_concurrent_txs(mut self, max: usize) -> Self {
262 self.max_concurrent_txs = max;
263 self
264 }
265
266 #[must_use]
272 pub fn with_max_notes_per_tx(mut self, max: NonZeroUsize) -> Self {
273 assert!(
274 max.get() <= miden_tx::MAX_NUM_CHECKER_NOTES,
275 "max_notes_per_tx ({}) exceeds MAX_NUM_CHECKER_NOTES ({})",
276 max,
277 miden_tx::MAX_NUM_CHECKER_NOTES
278 );
279 self.max_notes_per_tx = max;
280 self
281 }
282
283 #[must_use]
285 pub fn with_max_note_attempts(mut self, max: usize) -> Self {
286 self.max_note_attempts = max;
287 self
288 }
289
290 #[must_use]
292 pub fn with_max_block_count(mut self, max: usize) -> Self {
293 self.max_block_count = max;
294 self
295 }
296
297 #[must_use]
299 pub fn with_account_channel_capacity(mut self, capacity: usize) -> Self {
300 self.account_channel_capacity = capacity;
301 self
302 }
303
304 #[must_use]
308 pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
309 self.idle_timeout = timeout;
310 self
311 }
312
313 #[must_use]
315 pub fn with_max_account_crashes(mut self, max: usize) -> Self {
316 self.max_account_crashes = max;
317 self
318 }
319
320 #[must_use]
322 pub fn with_max_cycles(mut self, max: u32) -> Self {
323 self.max_cycles = max;
324 self
325 }
326
327 #[must_use]
330 pub fn with_tx_expiration_delta(mut self, delta: NonZeroU16) -> Self {
331 self.tx_expiration_delta = delta;
332 self
333 }
334
335 #[must_use]
338 pub fn with_request_backoff(mut self, initial: Duration, max: Duration) -> Self {
339 self.request_backoff_initial = initial;
340 self.request_backoff_max = max;
341 self
342 }
343
344 #[must_use]
346 pub fn with_sqlite_connection_pool_size(mut self, size: NonZeroUsize) -> Self {
347 self.sqlite_connection_pool_size = size;
348 self
349 }
350
351 pub async fn build(self) -> anyhow::Result<NetworkTransactionBuilder> {
365 anyhow::ensure!(
369 self.sqlite_connection_pool_size.get() >= 2,
370 "sqlite connection pool size must be at least 2 (the event loop pins one connection)",
371 );
372
373 let db = Db::load_with_pool_size(
375 self.database_filepath.clone(),
376 self.sqlite_connection_pool_size,
377 )
378 .await?;
379
380 let genesis_commitment = db.get_genesis_commitment().await.context(
382 "failed to read genesis commitment; \
383 run `miden-ntx-builder bootstrap` first",
384 )?;
385
386 let rpc = match self.rpc_auth_header.clone() {
387 Some(rpc_auth_header_value) => RpcClient::new_with_auth(
388 self.rpc_url.clone(),
389 Some(rpc_auth_header_value),
390 genesis_commitment,
391 self.request_backoff_initial,
392 self.request_backoff_max,
393 ),
394 None => RpcClient::new(
395 self.rpc_url.clone(),
396 genesis_commitment,
397 self.request_backoff_initial,
398 self.request_backoff_max,
399 ),
400 }?;
401
402 let (last_applied_block, header, mmr) =
406 db.get_chain_state().await.context("failed to read chain state")?.context(
407 "ntx-builder database has not been bootstrapped; \
408 run `miden-ntx-builder bootstrap` first",
409 )?;
410
411 let block_from = last_applied_block.child();
412
413 tracing::info!(
414 %block_from,
415 "ntx-builder opening committed-block subscription"
416 );
417
418 let raw_stream = rpc
419 .block_subscription_with_retry(block_from)
420 .await
421 .map_err(|err| anyhow::anyhow!(err))
422 .context("failed to subscribe to committed blocks")?;
423 let block_stream: BlockStream = Box::pin(raw_stream);
424
425 let chain = Arc::new(SharedChainState::new(header, mmr));
426
427 let (coordinator, actor_request_rx) =
428 self.build_coordinator(rpc, db.clone(), chain.clone())?;
429
430 Ok(NetworkTransactionBuilder::new(
431 self,
432 db,
433 block_stream,
434 last_applied_block,
435 chain,
436 coordinator,
437 actor_request_rx,
438 ))
439 }
440
441 fn build_coordinator(
447 &self,
448 rpc: RpcClient,
449 db: Db,
450 chain: Arc<SharedChainState>,
451 ) -> anyhow::Result<(Coordinator, mpsc::Receiver<actor::ActorRequest>)> {
452 let (request_tx, actor_request_rx) = mpsc::channel(self.account_channel_capacity);
453 let actor_context = AccountActorContext {
454 clients: GrpcClients {
455 rpc,
456 prover: RemoteTransactionProver::new(self.tx_prover_url.as_str()),
457 },
458 state: State {
459 db,
460 chain,
461 script_cache: LruCache::new(self.script_cache_size),
462 expiration_script: actor::expiration_tx_script(self.tx_expiration_delta)
463 .context("failed to compile network-tx expiration script")?,
464 },
465 config: ActorConfig {
466 max_notes_per_tx: self.max_notes_per_tx,
467 max_note_attempts: self.max_note_attempts,
468 idle_timeout: self.idle_timeout,
469 max_cycles: self.max_cycles,
470 tx_expiration_delta: self.tx_expiration_delta,
471 request_backoff_initial: self.request_backoff_initial,
472 request_backoff_max: self.request_backoff_max,
473 },
474 request_tx,
475 };
476 let coordinator =
477 Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, actor_context);
478
479 Ok((coordinator, actor_request_rx))
480 }
481}