use std::pin::Pin;
use std::sync::Arc;
use anyhow::Context;
use futures::Stream;
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::block::BlockHeader;
use miden_protocol::crypto::merkle::mmr::PartialMmr;
use miden_protocol::transaction::PartialBlockchain;
use tokio::sync::{RwLock, mpsc};
use tokio_stream::StreamExt;
use tonic::Status;
use crate::NtxBuilderConfig;
use crate::actor::{AccountActorContext, AccountOrigin, ActorNotification};
use crate::coordinator::Coordinator;
use crate::db::Db;
use crate::store::StoreClient;
#[derive(Debug, Clone)]
pub struct ChainState {
pub chain_tip_header: BlockHeader,
pub chain_mmr: Arc<PartialBlockchain>,
}
impl ChainState {
pub(crate) fn new(chain_tip_header: BlockHeader, chain_mmr: PartialMmr) -> Self {
let chain_mmr = PartialBlockchain::new(chain_mmr, [])
.expect("partial blockchain should build from partial mmr");
Self {
chain_tip_header,
chain_mmr: Arc::new(chain_mmr),
}
}
pub fn into_parts(self) -> (BlockHeader, Arc<PartialBlockchain>) {
(self.chain_tip_header, self.chain_mmr)
}
}
pub(crate) type MempoolEventStream =
Pin<Box<dyn Stream<Item = Result<MempoolEvent, Status>> + Send>>;
pub struct NetworkTransactionBuilder {
config: NtxBuilderConfig,
coordinator: Coordinator,
store: StoreClient,
db: Db,
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
notification_rx: mpsc::Receiver<ActorNotification>,
}
impl NetworkTransactionBuilder {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
config: NtxBuilderConfig,
coordinator: Coordinator,
store: StoreClient,
db: Db,
chain_state: Arc<RwLock<ChainState>>,
actor_context: AccountActorContext,
mempool_events: MempoolEventStream,
notification_rx: mpsc::Receiver<ActorNotification>,
) -> Self {
Self {
config,
coordinator,
store,
db,
chain_state,
actor_context,
mempool_events,
notification_rx,
}
}
pub async fn run(mut self) -> anyhow::Result<()> {
let (account_tx, mut account_rx) =
mpsc::channel::<NetworkAccountId>(self.config.account_channel_capacity);
let account_loader_store = self.store.clone();
let mut account_loader_handle = tokio::spawn(async move {
account_loader_store
.stream_network_account_ids(account_tx)
.await
.context("failed to load network accounts from store")
});
loop {
tokio::select! {
result = self.coordinator.next() => {
result?;
},
event = self.mempool_events.next() => {
let event = event
.context("mempool event stream ended")?
.context("mempool event stream failed")?;
self.handle_mempool_event(event.into()).await?;
},
Some(account_id) = account_rx.recv() => {
self.handle_loaded_account(account_id).await?;
},
Some(notification) = self.notification_rx.recv() => {
self.handle_actor_notification(notification).await;
},
result = &mut account_loader_handle => {
result
.context("account loader task panicked")
.flatten()?;
tracing::info!("account loading from store completed");
account_loader_handle = tokio::spawn(std::future::pending());
},
}
}
}
#[tracing::instrument(name = "ntx.builder.handle_loaded_account", skip(self, account_id))]
async fn handle_loaded_account(
&mut self,
account_id: NetworkAccountId,
) -> Result<(), anyhow::Error> {
let account = self
.store
.get_network_account(account_id)
.await
.context("failed to load account from store")?
.context("account should exist in store")?;
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let notes = self
.store
.get_unconsumed_network_notes(account_id, block_num.as_u32())
.await
.context("failed to load notes from store")?;
self.db
.sync_account_from_store(account_id, account.clone(), notes.clone())
.await
.context("failed to sync account to DB")?;
self.coordinator
.spawn_actor(AccountOrigin::store(account_id), &self.actor_context)
.await?;
Ok(())
}
#[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))]
async fn handle_mempool_event(
&mut self,
event: Arc<MempoolEvent>,
) -> Result<(), anyhow::Error> {
match event.as_ref() {
MempoolEvent::TransactionAdded { account_delta, .. } => {
self.coordinator
.write_event(&event)
.await
.context("failed to write TransactionAdded to DB")?;
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
if let Some(network_account) = AccountOrigin::transaction(delta) {
let is_creating_account = delta.is_full_state();
if is_creating_account {
self.coordinator
.spawn_actor(network_account, &self.actor_context)
.await?;
}
}
}
self.coordinator.send_targeted(&event).await?;
Ok(())
},
MempoolEvent::BlockCommitted { header, .. } => {
self.coordinator
.write_event(&event)
.await
.context("failed to write BlockCommitted to DB")?;
self.update_chain_tip(header.as_ref().clone()).await;
self.coordinator.broadcast(event.clone()).await;
Ok(())
},
MempoolEvent::TransactionsReverted(_) => {
let reverted_accounts = self
.coordinator
.write_event(&event)
.await
.context("failed to write TransactionsReverted to DB")?;
self.coordinator.broadcast(event.clone()).await;
for account_id in &reverted_accounts {
self.coordinator.cancel_actor(account_id);
}
Ok(())
},
}
}
async fn handle_actor_notification(&mut self, notification: ActorNotification) {
match notification {
ActorNotification::NotesFailed { nullifiers, block_num } => {
if let Err(err) = self.db.notes_failed(nullifiers, block_num).await {
tracing::error!(err = %err, "failed to mark notes as failed");
}
},
ActorNotification::CacheNoteScript { script_root, script } => {
if let Err(err) = self.db.insert_note_script(script_root, &script).await {
tracing::error!(err = %err, "failed to cache note script");
}
},
}
}
async fn update_chain_tip(&mut self, tip: BlockHeader) {
let mut chain_state = self.chain_state.write().await;
let mmr_tip = chain_state.chain_tip_header.clone();
Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true);
chain_state.chain_tip_header = tip;
let pruned_block_height = (chain_state
.chain_mmr
.chain_length()
.as_usize()
.saturating_sub(self.config.max_block_count)) as u32;
Arc::make_mut(&mut chain_state.chain_mmr).prune_to(..pruned_block_height.into());
}
}