use std::pin::Pin;
use std::sync::Arc;
use anyhow::Context;
use futures::Stream;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, mpsc};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;
use tonic::Status;
use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest};
use crate::chain_state::ChainState;
use crate::clients::StoreClient;
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::server::NtxBuilderRpcServer;
pub(crate) type MempoolEventStream =
Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;
pub struct NetworkTransactionBuilder {
config: NtxBuilderConfig,
coordinator: Coordinator,
store: StoreClient,
db: Db,
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
actor_request_rx: mpsc::Receiver<ActorRequest>,
}
impl NetworkTransactionBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
config: NtxBuilderConfig,
coordinator: Coordinator,
store: StoreClient,
db: Db,
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
actor_request_rx: mpsc::Receiver<ActorRequest>,
) -> Self {
Self {
config,
coordinator,
store,
db,
chain_state,
actor_context,
mempool_events,
actor_request_rx,
}
}
pub async fn run(self, listener: Option<TcpListener>) -> anyhow::Result<()> {
let mut join_set = JoinSet::new();
if let Some(listener) = listener {
let server = NtxBuilderRpcServer::new(self.db.clone());
join_set.spawn(async move {
server.serve(listener).await.context("ntx-builder gRPC server failed")
});
}
join_set.spawn(self.run_event_loop());
if let Some(result) = join_set.join_next().await {
result.context("ntx-builder task panicked")??;
}
Ok(())
}
async fn run_event_loop(mut self) -> anyhow::Result<()> {
let (account_tx, mut account_rx) =
mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
let account_loader_store = self.store.clone();
let mut account_loader_handle = tokio::spawn(async move {
account_loader_store
.stream_network_account_ids(account_tx)
.await
.context("failed to load network accounts from store")
});
loop {
tokio::select! {
result = self.coordinator.next() => {
if let Some(account_id) = result? {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
},
event = self.mempool_events.next() => {
let event = event
.context("mempool event stream ended")?
.context("mempool event stream failed")?;
self.handle_mempool_event(event).await?;
},
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id).await?;
},
Some(request) = self.actor_request_rx.recv() => {
self.handle_actor_request(request).await?;
},
result = &mut account_loader_handle => {
result
.context("account loader task panicked")
.flatten()?;
tracing::info!("account loading from store completed");
account_loader_handle = tokio::spawn(std::future::pending());
},
}
}
}
#[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
async fn handle_loaded_account(
&mut self,
account_id: NetworkAccountId,
) -> Result<(), anyhow::Error> {
let account = self
.store
.get_network_account(account_id)
.await
.context("failed to load account from store")?
.context("account should exist in store")?;
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let notes = self
.store
.get_unconsumed_network_notes(account_id, block_num.as_u32())
.await
.context("failed to load notes from store")?;
self.db
.sync_account_from_store(account_id, account.clone(), notes.clone())
.await
.context("failed to sync account to DB")?;
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
Ok(())
}
#[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> {
match &event {
MempoolEvent::TransactionAdded { account_delta, .. } => {
self.coordinator
.write_event(&event)
.await
.context("failed to write TransactionAdded to DB")?;
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
if let Some(network_account) = AccountOrigin::transaction(delta) {
let is_creating_account = delta.is_full_state();
if is_creating_account {
self.coordinator.spawn_actor(network_account, &self.actor_context);
}
}
}
let inactive_targets = self.coordinator.send_targeted(&event);
for account_id in inactive_targets {
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context);
}
Ok(())
},
MempoolEvent::BlockCommitted { header, .. } => {
let result = self
.coordinator
.write_event(&event)
.await
.context("failed to write BlockCommitted to DB")?;
self.update_chain_tip(header.as_ref().clone()).await;
self.coordinator.notify_accounts(&result.accounts_to_notify);
Ok(())
},
MempoolEvent::TransactionsReverted(_) => {
let result = self
.coordinator
.write_event(&event)
.await
.context("failed to write TransactionsReverted to DB")?;
self.coordinator.notify_accounts(&result.accounts_to_notify);
Ok(())
},
}
}
async fn handle_actor_request(&mut self, request: ActorRequest) -> Result<(), anyhow::Error> {
match request {
ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
self.db
.notes_failed(failed_notes, block_num)
.await
.context("failed to mark notes as failed")?;
let _ = ack_tx.send(());
},
ActorRequest::CacheNoteScript { script_root, script } => {
self.db
.insert_note_script(script_root, &script)
.await
.context("failed to cache note script")?;
},
}
Ok(())
}
async fn update_chain_tip(&mut self, tip: BlockHeader) {
let mut chain_state = self.chain_state.write().await;
let mmr_tip = chain_state.chain_tip_header.clone();
Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);
chain_state.chain_tip_header = tip;
let pruned_block_height = (chain_state
.chain_mmr
.chain_length()
.as_usize()
.saturating_sub(self.config.max_block_count)) as u32;
Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
}
}