use std::pin::Pin;
use std::sync::Arc;
use anyhow::Context;
use futures::Stream;
use miden_node_utils::tasks::Tasks;
use miden_protocol::block::{BlockNumber, SignedBlock};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use crate::NtxBuilderConfig;
use crate::actor::ActorRequest;
use crate::chain_state::SharedChainState;
use crate::clients::RpcError;
use crate::committed_block::CommittedBlockEffects;
use crate::coordinator::Coordinator;
use crate::db::{Db, LoopDb};
use crate::server::NtxBuilderRpcServer;
enum SteadyStateAction {
Block(Box<Option<Result<(SignedBlock, BlockNumber), RpcError>>>),
Request(Option<ActorRequest>),
Respawn(Option<miden_protocol::account::AccountId>),
}
pub(crate) type BlockStream =
Pin<Box<dyn Stream<Item = Result<(SignedBlock, BlockNumber), RpcError>> + Send>>;
pub struct NetworkTransactionBuilder {
config: NtxBuilderConfig,
db: Db,
block_stream: BlockStream,
last_applied_block: BlockNumber,
chain: Arc<SharedChainState>,
coordinator: Coordinator,
actor_request_rx: mpsc::Receiver<ActorRequest>,
is_synced: bool,
}
impl NetworkTransactionBuilder {
pub(crate) fn new(
config: NtxBuilderConfig,
db: Db,
block_stream: BlockStream,
last_applied_block: BlockNumber,
chain: Arc<SharedChainState>,
coordinator: Coordinator,
actor_request_rx: mpsc::Receiver<ActorRequest>,
) -> Self {
Self {
config,
db,
block_stream,
last_applied_block,
chain,
coordinator,
actor_request_rx,
is_synced: false,
}
}
pub fn is_synced(&self) -> bool {
self.is_synced
}
pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> {
let mut tasks = Tasks::new();
let server = NtxBuilderRpcServer::new(self.db.clone(), self.config.max_note_attempts);
tasks.spawn("grpc-server", async move {
server.serve(listener).await.context("ntx-builder gRPC server failed")
});
tasks.spawn("event-loop", self.run_event_loop());
tasks.join_next_as_error().await.context("ntx-builder task failed")
}
async fn run_event_loop(mut self) -> anyhow::Result<()> {
let loop_db = self
.db
.pin_loop_connection()
.await
.context("failed to pin a database connection for the ntx-builder event loop")?;
loop {
let (block, committed_tip) = self.next_block().await?;
let local_tip = block.header().block_num();
self.apply_committed_block(&loop_db, block, committed_tip).await?;
if local_tip == committed_tip {
self.is_synced = true;
tracing::info!(block.number = %committed_tip, "ntx-builder is now in sync");
break;
}
}
let pending_accounts = loop_db
.accounts_with_pending_notes(self.config.max_note_attempts)
.await
.context("failed to load accounts with pending notes at catch-up")?;
tracing::info!(
num_accounts = pending_accounts.len(),
"spawning actors for accounts with carry-over pending notes",
);
for account_id in pending_accounts {
self.coordinator.spawn_actor(account_id);
}
loop {
let action = {
let block_stream = &mut self.block_stream;
let actor_request_rx = &mut self.actor_request_rx;
let coordinator = &mut self.coordinator;
tokio::select! {
block = block_stream.next() => SteadyStateAction::Block(Box::new(block)),
request = actor_request_rx.recv() => SteadyStateAction::Request(request),
respawn = coordinator.next() => SteadyStateAction::Respawn(respawn?),
}
};
match action {
SteadyStateAction::Block(block) => {
let (block, committed_tip) =
(*block).context("block stream ended")?.context("block stream failed")?;
let effects = self
.apply_committed_block_with_effects(&loop_db, block, committed_tip)
.await?;
self.coordinator.handle_committed_block(&effects);
},
SteadyStateAction::Request(request) => {
let Some(request) = request else {
anyhow::bail!("actor request channel closed unexpectedly");
};
handle_actor_request(&loop_db, request).await?;
},
SteadyStateAction::Respawn(respawn) => {
if let Some(account_id) = respawn {
tracing::info!(
account.id = %account_id,
"respawning actor that shut down with a pending notification",
);
self.coordinator.spawn_actor(account_id);
}
},
}
}
}
async fn next_block(&mut self) -> anyhow::Result<(SignedBlock, BlockNumber)> {
self.block_stream
.next()
.await
.context("block stream ended")?
.context("block stream failed")
}
async fn apply_committed_block(
&mut self,
loop_db: &LoopDb,
block: SignedBlock,
committed_tip: BlockNumber,
) -> anyhow::Result<()> {
self.apply_committed_block_with_effects(loop_db, block, committed_tip)
.await
.map(drop)
}
#[tracing::instrument(
name = "ntx.builder.apply_committed_block",
skip(self, loop_db, block),
fields(block_num = %block.header().block_num(), %committed_tip),
)]
async fn apply_committed_block_with_effects(
&mut self,
loop_db: &LoopDb,
block: SignedBlock,
committed_tip: BlockNumber,
) -> anyhow::Result<CommittedBlockEffects> {
let header = block.header().clone();
let block_num = header.block_num();
let effects = CommittedBlockEffects::from_signed_block(&block);
self.chain.update_chain_tip(header, self.config.max_block_count);
let next_mmr = self.chain.current_mmr();
loop_db
.apply_committed_block(effects.clone(), next_mmr)
.await
.context("failed to apply committed block to DB")?;
self.last_applied_block = block_num;
Ok(effects)
}
}
async fn handle_actor_request(loop_db: &LoopDb, request: ActorRequest) -> anyhow::Result<()> {
match request {
ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
loop_db
.notes_failed(failed_notes, block_num)
.await
.context("failed to persist note failure")?;
let _ = ack_tx.send(());
},
ActorRequest::CacheNoteScript { script_root, script } => {
loop_db
.insert_note_script(script_root, &script)
.await
.context("failed to cache note script")?;
},
}
Ok(())
}