use crate::{blocks::Paused, Provider, RootProvider};
use alloy_consensus::BlockHeader;
use alloy_json_rpc::RpcError;
use alloy_network::{BlockResponse, Network};
use alloy_primitives::{
map::{B256HashMap, B256HashSet},
TxHash, B256,
};
use alloy_transport::{utils::Spawnable, TransportError};
use futures::{future::pending, stream::StreamExt, FutureExt, Stream};
use std::{
collections::{BTreeMap, VecDeque},
fmt,
future::Future,
sync::Arc,
time::Duration,
};
use tokio::{
select,
sync::{mpsc, oneshot},
};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use wasmtimer::{
std::Instant,
tokio::{interval, sleep_until},
};
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use {
std::time::Instant,
tokio::time::{interval, sleep_until},
};
#[derive(Debug, thiserror::Error)]
pub enum PendingTransactionError {
#[error("failed to register pending transaction to watch")]
FailedToRegister,
#[error(transparent)]
TransportError(#[from] TransportError),
#[error(transparent)]
Recv(#[from] oneshot::error::RecvError),
#[error(transparent)]
TxWatcher(#[from] WatchTxError),
}
#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
#[derive(Debug)]
#[doc(alias = "PendingTxBuilder")]
pub struct PendingTransactionBuilder<N: Network> {
config: PendingTransactionConfig,
provider: RootProvider<N>,
}
impl<N: Network> PendingTransactionBuilder<N> {
pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
}
pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
Self { config, provider }
}
pub const fn inner(&self) -> &PendingTransactionConfig {
&self.config
}
pub fn into_inner(self) -> PendingTransactionConfig {
self.config
}
pub const fn provider(&self) -> &RootProvider<N> {
&self.provider
}
pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
(self.provider, self.config)
}
pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
f(&self);
self
}
#[doc(alias = "transaction_hash")]
pub const fn tx_hash(&self) -> &TxHash {
self.config.tx_hash()
}
#[doc(alias = "set_transaction_hash")]
pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
self.config.set_tx_hash(tx_hash);
}
#[doc(alias = "with_transaction_hash")]
pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
self.config.tx_hash = tx_hash;
self
}
#[doc(alias = "confirmations")]
pub const fn required_confirmations(&self) -> u64 {
self.config.required_confirmations()
}
#[doc(alias = "set_confirmations")]
pub const fn set_required_confirmations(&mut self, confirmations: u64) {
self.config.set_required_confirmations(confirmations);
}
#[doc(alias = "with_confirmations")]
pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
self.config.required_confirmations = confirmations;
self
}
pub const fn timeout(&self) -> Option<Duration> {
self.config.timeout()
}
pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
self.config.set_timeout(timeout);
}
pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
self.config.timeout = timeout;
self
}
#[doc(alias = "build")]
pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
self.provider.watch_pending_transaction(self.config).await
}
pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
self.register().await?.await
}
pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
let hash = self.config.tx_hash;
let required_confirmations = self.config.required_confirmations;
let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
let mut interval = if required_confirmations > 1 {
None
} else {
Some(interval(self.provider.client().poll_interval()))
};
loop {
let mut confirmed = false;
let tick_fut = if let Some(interval) = interval.as_mut() {
interval.tick().map(|_| ()).left_future()
} else {
pending::<()>().right_future()
};
select! {
_ = tick_fut => {},
res = &mut pending_tx => {
let _ = res?;
confirmed = true;
}
}
let receipt = self.provider.get_transaction_receipt(hash).await?;
if let Some(receipt) = receipt {
return Ok(receipt);
}
if confirmed {
return Err(RpcError::NullResp.into());
}
}
}
}
#[must_use = "this type does nothing unless you call `with_provider`"]
#[derive(Clone, Debug)]
#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
pub struct PendingTransactionConfig {
#[doc(alias = "transaction_hash")]
tx_hash: TxHash,
required_confirmations: u64,
timeout: Option<Duration>,
}
impl PendingTransactionConfig {
pub const fn new(tx_hash: TxHash) -> Self {
Self { tx_hash, required_confirmations: 1, timeout: None }
}
#[doc(alias = "transaction_hash")]
pub const fn tx_hash(&self) -> &TxHash {
&self.tx_hash
}
#[doc(alias = "set_transaction_hash")]
pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
self.tx_hash = tx_hash;
}
#[doc(alias = "with_transaction_hash")]
pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
self.tx_hash = tx_hash;
self
}
#[doc(alias = "confirmations")]
pub const fn required_confirmations(&self) -> u64 {
self.required_confirmations
}
#[doc(alias = "set_confirmations")]
pub const fn set_required_confirmations(&mut self, confirmations: u64) {
self.required_confirmations = confirmations;
}
#[doc(alias = "with_confirmations")]
pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
self.required_confirmations = confirmations;
self
}
pub const fn timeout(&self) -> Option<Duration> {
self.timeout
}
pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
self.timeout = timeout;
}
pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub const fn with_provider<N: Network>(
self,
provider: RootProvider<N>,
) -> PendingTransactionBuilder<N> {
PendingTransactionBuilder::from_config(provider, self)
}
}
impl From<TxHash> for PendingTransactionConfig {
fn from(tx_hash: TxHash) -> Self {
Self::new(tx_hash)
}
}
#[derive(Debug, thiserror::Error)]
pub enum WatchTxError {
#[error("transaction was not confirmed within the timeout")]
Timeout,
}
#[doc(alias = "TransactionWatcher")]
struct TxWatcher {
config: PendingTransactionConfig,
received_at_block: Option<u64>,
tx: oneshot::Sender<Result<(), WatchTxError>>,
}
impl TxWatcher {
fn notify(self, result: Result<(), WatchTxError>) {
debug!(tx=%self.config.tx_hash, "notifying");
let _ = self.tx.send(result);
}
}
#[doc(alias = "PendingTx", alias = "TxPending")]
pub struct PendingTransaction {
#[doc(alias = "transaction_hash")]
pub(crate) tx_hash: TxHash,
pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
}
impl fmt::Debug for PendingTransaction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
}
}
impl PendingTransaction {
pub fn ready(tx_hash: TxHash) -> Self {
let (tx, rx) = oneshot::channel();
tx.send(Ok(())).ok(); Self { tx_hash, rx }
}
#[doc(alias = "transaction_hash")]
pub const fn tx_hash(&self) -> &TxHash {
&self.tx_hash
}
}
impl Future for PendingTransaction {
type Output = Result<TxHash, PendingTransactionError>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.rx.poll_unpin(cx).map(|res| {
res??;
Ok(self.tx_hash)
})
}
}
#[derive(Clone, Debug)]
pub(crate) struct HeartbeatHandle {
tx: mpsc::Sender<TxWatcher>,
}
impl HeartbeatHandle {
#[doc(alias = "watch_transaction")]
pub(crate) async fn watch_tx(
&self,
config: PendingTransactionConfig,
received_at_block: Option<u64>,
) -> Result<PendingTransaction, PendingTransactionConfig> {
let (tx, rx) = oneshot::channel();
let tx_hash = config.tx_hash;
match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
Err(e) => Err(e.0.config),
}
}
}
pub(crate) struct Heartbeat<N, S> {
stream: futures::stream::Fuse<S>,
past_blocks: VecDeque<(u64, B256HashSet)>,
unconfirmed: B256HashMap<TxWatcher>,
waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
reap_at: BTreeMap<Instant, B256>,
paused: Arc<Paused>,
_network: std::marker::PhantomData<N>,
}
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
Self {
stream: stream.fuse(),
past_blocks: Default::default(),
unconfirmed: Default::default(),
waiting_confs: Default::default(),
reap_at: Default::default(),
paused: is_paused,
_network: Default::default(),
}
}
fn check_confirmations(&mut self, current_height: u64) {
let to_keep = self.waiting_confs.split_off(&(current_height + 1));
let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
for watcher in to_notify.into_values().flatten() {
watcher.notify(Ok(()));
}
}
fn next_reap(&self) -> Instant {
self.reap_at
.first_key_value()
.map(|(k, _)| *k)
.unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
}
fn reap_timeouts(&mut self) {
let now = Instant::now();
let to_keep = self.reap_at.split_off(&now);
let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
for tx_hash in to_reap.values() {
if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
debug!(tx=%tx_hash, "reaped");
watcher.notify(Err(WatchTxError::Timeout));
}
}
}
fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
for waiters in self.waiting_confs.values_mut() {
*waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
if let Some(received_at_block) = watcher.received_at_block {
if received_at_block >= new_height {
let hash = watcher.config.tx_hash;
debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed after chain gap");
self.unconfirmed.insert(hash, watcher);
return None;
}
}
Some(watcher)
}).collect();
}
}
fn has_pending_transactions(&self) -> bool {
!self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
}
fn update_pause_state(&mut self) {
let should_pause = !self.has_pending_transactions();
if self.paused.is_paused() != should_pause {
debug!(paused = should_pause, "updating heartbeat pause state");
self.paused.set_paused(should_pause);
}
}
fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
debug!(tx=%to_watch.config.tx_hash, "watching");
trace!(?to_watch.config, ?to_watch.received_at_block);
if let Some(received_at_block) = to_watch.received_at_block {
let confirmations = to_watch.config.required_confirmations;
let confirmed_at = received_at_block + confirmations - 1;
let current_height =
self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
if confirmed_at <= current_height {
to_watch.notify(Ok(()));
} else {
self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
}
return;
}
if let Some(timeout) = to_watch.config.timeout {
self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
}
for (block_height, txs) in self.past_blocks.iter().rev() {
if txs.contains(&to_watch.config.tx_hash) {
let confirmations = to_watch.config.required_confirmations;
let confirmed_at = *block_height + confirmations - 1;
let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
if confirmed_at <= current_height {
to_watch.notify(Ok(()));
} else {
debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
let mut to_watch = to_watch;
if to_watch.received_at_block.is_none() {
to_watch.received_at_block = Some(*block_height);
}
self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
}
return;
}
}
self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
}
fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
let confirmations = watcher.config.required_confirmations;
debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
}
fn handle_new_block(&mut self, block: N::BlockResponse) {
let block_height = block.header().as_ref().number();
debug!(%block_height, "handling block");
const MAX_BLOCKS_TO_RETAIN: usize = 10;
if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
self.past_blocks.pop_front();
}
if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
if *last_height + 1 != block_height {
debug!(block_height, last_height, "reorg/unpause detected");
self.move_reorg_to_unconfirmed(block_height);
self.past_blocks.retain(|(h, _)| *h < block_height);
}
}
self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
let to_check: Vec<_> = block
.transactions()
.hashes()
.filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
.collect();
for mut watcher in to_check {
let confirmations = watcher.config.required_confirmations;
if confirmations <= 1 {
watcher.notify(Ok(()));
continue;
}
if let Some(set_block) = watcher.received_at_block {
warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
} else {
watcher.received_at_block = Some(block_height);
}
self.add_to_waiting_list(watcher, block_height);
}
self.check_confirmations(block_height);
}
}
#[cfg(target_family = "wasm")]
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
pub(crate) fn spawn(self) -> HeartbeatHandle {
let (task, handle) = self.consume();
task.spawn_task();
handle
}
}
#[cfg(not(target_family = "wasm"))]
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
pub(crate) fn spawn(self) -> HeartbeatHandle {
let (task, handle) = self.consume();
task.spawn_task();
handle
}
}
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
let (ix_tx, ixns) = mpsc::channel(64);
(self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
}
async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
'shutdown: loop {
{
self.update_pause_state();
let next_reap = self.next_reap();
let sleep = std::pin::pin!(sleep_until(next_reap.into()));
select! {
biased;
ix_opt = ixns.recv() => match ix_opt {
Some(to_watch) => self.handle_watch_ix(to_watch),
None => break 'shutdown, },
Some(block) = self.stream.next() => {
self.handle_new_block(block);
},
_ = sleep => {},
}
}
self.reap_timeouts();
}
}
}