alloy_provider/
heart.rs

1//! Block heartbeat and pending transaction watcher.
2
3use crate::{blocks::Paused, Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8    map::{B256HashMap, B256HashSet},
9    TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{future::pending, stream::StreamExt, FutureExt, Stream};
13use std::{
14    collections::{BTreeMap, VecDeque},
15    fmt,
16    future::Future,
17    sync::Arc,
18    time::Duration,
19};
20use tokio::{
21    select,
22    sync::{mpsc, oneshot},
23};
24
25#[cfg(all(target_family = "wasm", target_os = "unknown"))]
26use wasmtimer::{
27    std::Instant,
28    tokio::{interval, sleep_until},
29};
30
31#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
32use {
33    std::time::Instant,
34    tokio::time::{interval, sleep_until},
35};
36
37/// Errors which may occur when watching a pending transaction.
38#[derive(Debug, thiserror::Error)]
39pub enum PendingTransactionError {
40    /// Failed to register pending transaction in heartbeat.
41    #[error("failed to register pending transaction to watch")]
42    FailedToRegister,
43
44    /// Underlying transport error.
45    #[error(transparent)]
46    TransportError(#[from] TransportError),
47
48    /// Error occurred while getting response from the heartbeat.
49    #[error(transparent)]
50    Recv(#[from] oneshot::error::RecvError),
51
52    /// Errors that may occur when watching a transaction.
53    #[error(transparent)]
54    TxWatcher(#[from] WatchTxError),
55}
56
57/// A builder for configuring a pending transaction watcher.
58///
59/// # Examples
60///
61/// Send and wait for a transaction to be confirmed 2 times, with a timeout of 60 seconds:
62///
63/// ```no_run
64/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
65/// // Send a transaction, and configure the pending transaction.
66/// let builder = provider.send_transaction(tx)
67///     .await?
68///     .with_required_confirmations(2)
69///     .with_timeout(Some(std::time::Duration::from_secs(60)));
70/// // Register the pending transaction with the provider.
71/// let pending_tx = builder.register().await?;
72/// // Wait for the transaction to be confirmed 2 times.
73/// let tx_hash = pending_tx.await?;
74/// # Ok(())
75/// # }
76/// ```
77///
78/// This can also be more concisely written using `watch`:
79/// ```no_run
80/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
81/// let tx_hash = provider.send_transaction(tx)
82///     .await?
83///     .with_required_confirmations(2)
84///     .with_timeout(Some(std::time::Duration::from_secs(60)))
85///     .watch()
86///     .await?;
87/// # Ok(())
88/// # }
89/// ```
90#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
91#[derive(Debug)]
92#[doc(alias = "PendingTxBuilder")]
93pub struct PendingTransactionBuilder<N: Network> {
94    config: PendingTransactionConfig,
95    provider: RootProvider<N>,
96}
97
98impl<N: Network> PendingTransactionBuilder<N> {
99    /// Creates a new pending transaction builder.
100    pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
101        Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
102    }
103
104    /// Creates a new pending transaction builder from the given configuration.
105    pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
106        Self { config, provider }
107    }
108
109    /// Returns the inner configuration.
110    pub const fn inner(&self) -> &PendingTransactionConfig {
111        &self.config
112    }
113
114    /// Consumes this builder, returning the inner configuration.
115    pub fn into_inner(self) -> PendingTransactionConfig {
116        self.config
117    }
118
119    /// Returns the provider.
120    pub const fn provider(&self) -> &RootProvider<N> {
121        &self.provider
122    }
123
124    /// Consumes this builder, returning the provider and the configuration.
125    pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
126        (self.provider, self.config)
127    }
128
129    /// Calls a function with a reference to the value.
130    pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
131        f(&self);
132        self
133    }
134
135    /// Returns the transaction hash.
136    #[doc(alias = "transaction_hash")]
137    pub const fn tx_hash(&self) -> &TxHash {
138        self.config.tx_hash()
139    }
140
141    /// Sets the transaction hash.
142    #[doc(alias = "set_transaction_hash")]
143    pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
144        self.config.set_tx_hash(tx_hash);
145    }
146
147    /// Sets the transaction hash.
148    #[doc(alias = "with_transaction_hash")]
149    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
150        self.config.tx_hash = tx_hash;
151        self
152    }
153
154    /// Returns the number of confirmations to wait for.
155    #[doc(alias = "confirmations")]
156    pub const fn required_confirmations(&self) -> u64 {
157        self.config.required_confirmations()
158    }
159
160    /// Sets the number of confirmations to wait for.
161    #[doc(alias = "set_confirmations")]
162    pub const fn set_required_confirmations(&mut self, confirmations: u64) {
163        self.config.set_required_confirmations(confirmations);
164    }
165
166    /// Sets the number of confirmations to wait for.
167    #[doc(alias = "with_confirmations")]
168    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
169        self.config.required_confirmations = confirmations;
170        self
171    }
172
173    /// Returns the timeout.
174    pub const fn timeout(&self) -> Option<Duration> {
175        self.config.timeout()
176    }
177
178    /// Sets the timeout.
179    pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
180        self.config.set_timeout(timeout);
181    }
182
183    /// Sets the timeout.
184    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185        self.config.timeout = timeout;
186        self
187    }
188
189    /// Registers the watching configuration with the provider.
190    ///
191    /// This does not wait for the transaction to be confirmed, but returns a [`PendingTransaction`]
192    /// that can be awaited at a later moment.
193    ///
194    /// See:
195    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
196    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
197    ///   confirmed.
198    #[doc(alias = "build")]
199    pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
200        self.provider.watch_pending_transaction(self.config).await
201    }
202
203    /// Waits for the transaction to confirm with the given number of confirmations.
204    ///
205    /// See:
206    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
207    ///   confirmed.
208    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
209    ///   confirmed.
210    pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
211        self.register().await?.await
212    }
213
214    /// Waits for the transaction to confirm with the given number of confirmations, and
215    /// then fetches its receipt.
216    ///
217    /// Note that this method will call `eth_getTransactionReceipt` on the [**root
218    /// provider**](RootProvider), and not on a specific network provider. This means that any
219    /// overrides or customizations made to the network provider will not be used.
220    ///
221    /// See:
222    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
223    ///   confirmed.
224    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
225    pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
226        let hash = self.config.tx_hash;
227        let required_confirmations = self.config.required_confirmations;
228        let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
229
230        // FIXME: this is a hotfix to prevent a race condition where the heartbeat would miss the
231        // block the tx was mined in. Only apply this for single confirmation to respect the
232        // confirmation setting.
233        let mut interval = if required_confirmations > 1 {
234            None
235        } else {
236            Some(interval(self.provider.client().poll_interval()))
237        };
238
239        loop {
240            let mut confirmed = false;
241
242            // If more than 1 block confirmations is specified then we can rely on the regular
243            // watch_pending_transaction and dont need this workaround for the above mentioned race
244            // condition
245            let tick_fut = if let Some(interval) = interval.as_mut() {
246                interval.tick().map(|_| ()).left_future()
247            } else {
248                pending::<()>().right_future()
249            };
250
251            select! {
252                _ = tick_fut => {},
253                res = &mut pending_tx => {
254                    let _ = res?;
255                    confirmed = true;
256                }
257            }
258
259            // try to fetch the receipt
260            let receipt = self.provider.get_transaction_receipt(hash).await?;
261            if let Some(receipt) = receipt {
262                return Ok(receipt);
263            }
264
265            if confirmed {
266                return Err(RpcError::NullResp.into());
267            }
268        }
269    }
270}
271
272/// Configuration for watching a pending transaction.
273///
274/// This type can be used to create a [`PendingTransactionBuilder`], but in general it is only used
275/// internally.
276#[must_use = "this type does nothing unless you call `with_provider`"]
277#[derive(Clone, Debug)]
278#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
279pub struct PendingTransactionConfig {
280    /// The transaction hash to watch for.
281    #[doc(alias = "transaction_hash")]
282    tx_hash: TxHash,
283
284    /// Require a number of confirmations.
285    required_confirmations: u64,
286
287    /// Optional timeout for the transaction.
288    timeout: Option<Duration>,
289}
290
291impl PendingTransactionConfig {
292    /// Create a new watch for a transaction.
293    pub const fn new(tx_hash: TxHash) -> Self {
294        Self { tx_hash, required_confirmations: 1, timeout: None }
295    }
296
297    /// Returns the transaction hash.
298    #[doc(alias = "transaction_hash")]
299    pub const fn tx_hash(&self) -> &TxHash {
300        &self.tx_hash
301    }
302
303    /// Sets the transaction hash.
304    #[doc(alias = "set_transaction_hash")]
305    pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
306        self.tx_hash = tx_hash;
307    }
308
309    /// Sets the transaction hash.
310    #[doc(alias = "with_transaction_hash")]
311    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
312        self.tx_hash = tx_hash;
313        self
314    }
315
316    /// Returns the number of confirmations to wait for.
317    #[doc(alias = "confirmations")]
318    pub const fn required_confirmations(&self) -> u64 {
319        self.required_confirmations
320    }
321
322    /// Sets the number of confirmations to wait for.
323    #[doc(alias = "set_confirmations")]
324    pub const fn set_required_confirmations(&mut self, confirmations: u64) {
325        self.required_confirmations = confirmations;
326    }
327
328    /// Sets the number of confirmations to wait for.
329    #[doc(alias = "with_confirmations")]
330    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
331        self.required_confirmations = confirmations;
332        self
333    }
334
335    /// Returns the timeout.
336    pub const fn timeout(&self) -> Option<Duration> {
337        self.timeout
338    }
339
340    /// Sets the timeout.
341    pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
342        self.timeout = timeout;
343    }
344
345    /// Sets the timeout.
346    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
347        self.timeout = timeout;
348        self
349    }
350
351    /// Wraps this configuration with a provider to expose watching methods.
352    pub const fn with_provider<N: Network>(
353        self,
354        provider: RootProvider<N>,
355    ) -> PendingTransactionBuilder<N> {
356        PendingTransactionBuilder::from_config(provider, self)
357    }
358}
359
360impl From<TxHash> for PendingTransactionConfig {
361    fn from(tx_hash: TxHash) -> Self {
362        Self::new(tx_hash)
363    }
364}
365
366/// Errors which may occur in heartbeat when watching a transaction.
367#[derive(Debug, thiserror::Error)]
368pub enum WatchTxError {
369    /// Transaction was not confirmed after configured timeout.
370    #[error("transaction was not confirmed within the timeout")]
371    Timeout,
372}
373
374/// The type sent by the [`HeartbeatHandle`] to the [`Heartbeat`] background task.
375#[doc(alias = "TransactionWatcher")]
376struct TxWatcher {
377    config: PendingTransactionConfig,
378    /// The block at which the transaction was received. To be filled once known.
379    /// Invariant: any confirmed transaction in `Heart` has this value set.
380    received_at_block: Option<u64>,
381    tx: oneshot::Sender<Result<(), WatchTxError>>,
382}
383
384impl TxWatcher {
385    /// Notify the waiter.
386    fn notify(self, result: Result<(), WatchTxError>) {
387        debug!(tx=%self.config.tx_hash, "notifying");
388        let _ = self.tx.send(result);
389    }
390}
391
392/// Represents a transaction that is yet to be confirmed a specified number of times.
393///
394/// This struct is a future created by [`PendingTransactionBuilder`] that resolves to the
395/// transaction hash once the underlying transaction has been confirmed the specified number of
396/// times in the network.
397#[doc(alias = "PendingTx", alias = "TxPending")]
398pub struct PendingTransaction {
399    /// The transaction hash.
400    #[doc(alias = "transaction_hash")]
401    pub(crate) tx_hash: TxHash,
402    /// The receiver for the notification.
403    // TODO: send a receipt?
404    pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
405}
406
407impl fmt::Debug for PendingTransaction {
408    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409        f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
410    }
411}
412
413impl PendingTransaction {
414    /// Creates a ready pending transaction.
415    pub fn ready(tx_hash: TxHash) -> Self {
416        let (tx, rx) = oneshot::channel();
417        tx.send(Ok(())).ok(); // Make sure that the receiver is notified already.
418        Self { tx_hash, rx }
419    }
420
421    /// Returns this transaction's hash.
422    #[doc(alias = "transaction_hash")]
423    pub const fn tx_hash(&self) -> &TxHash {
424        &self.tx_hash
425    }
426}
427
428impl Future for PendingTransaction {
429    type Output = Result<TxHash, PendingTransactionError>;
430
431    fn poll(
432        mut self: std::pin::Pin<&mut Self>,
433        cx: &mut std::task::Context<'_>,
434    ) -> std::task::Poll<Self::Output> {
435        self.rx.poll_unpin(cx).map(|res| {
436            res??;
437            Ok(self.tx_hash)
438        })
439    }
440}
441
442/// A handle to the heartbeat task.
443#[derive(Clone, Debug)]
444pub(crate) struct HeartbeatHandle {
445    tx: mpsc::Sender<TxWatcher>,
446}
447
448impl HeartbeatHandle {
449    /// Watch for a transaction to be confirmed with the given config.
450    #[doc(alias = "watch_transaction")]
451    pub(crate) async fn watch_tx(
452        &self,
453        config: PendingTransactionConfig,
454        received_at_block: Option<u64>,
455    ) -> Result<PendingTransaction, PendingTransactionConfig> {
456        let (tx, rx) = oneshot::channel();
457        let tx_hash = config.tx_hash;
458        match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
459            Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
460            Err(e) => Err(e.0.config),
461        }
462    }
463}
464
465/// A heartbeat task that receives blocks and watches for transactions.
466pub(crate) struct Heartbeat<N, S> {
467    /// The stream of incoming blocks to watch.
468    stream: futures::stream::Fuse<S>,
469
470    /// Lookbehind blocks in form of mapping block number -> vector of transaction hashes.
471    past_blocks: VecDeque<(u64, B256HashSet)>,
472
473    /// Transactions to watch for.
474    unconfirmed: B256HashMap<TxWatcher>,
475
476    /// Ordered map of transactions waiting for confirmations.
477    waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
478
479    /// Ordered map of transactions to reap at a certain time.
480    reap_at: BTreeMap<Instant, B256>,
481
482    /// Whether the heartbeat is currently paused.
483    paused: Arc<Paused>,
484
485    _network: std::marker::PhantomData<N>,
486}
487
488impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
489    /// Create a new heartbeat task.
490    pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
491        Self {
492            stream: stream.fuse(),
493            past_blocks: Default::default(),
494            unconfirmed: Default::default(),
495            waiting_confs: Default::default(),
496            reap_at: Default::default(),
497            paused: is_paused,
498            _network: Default::default(),
499        }
500    }
501
502    /// Check if any transactions have enough confirmations to notify.
503    fn check_confirmations(&mut self, current_height: u64) {
504        let to_keep = self.waiting_confs.split_off(&(current_height + 1));
505        let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
506        for watcher in to_notify.into_values().flatten() {
507            watcher.notify(Ok(()));
508        }
509    }
510
511    /// Get the next time to reap a transaction. If no reaps, this is a very
512    /// long time from now (i.e. will not be woken).
513    fn next_reap(&self) -> Instant {
514        self.reap_at
515            .first_key_value()
516            .map(|(k, _)| *k)
517            .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
518    }
519
520    /// Reap any timeout
521    fn reap_timeouts(&mut self) {
522        let now = Instant::now();
523        let to_keep = self.reap_at.split_off(&now);
524        let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
525
526        for tx_hash in to_reap.values() {
527            if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
528                debug!(tx=%tx_hash, "reaped");
529                watcher.notify(Err(WatchTxError::Timeout));
530            }
531        }
532    }
533
534    /// Reap transactions overridden by a chain gap (true reorg or resync after a pause).
535    /// Accepts new chain height as an argument, and drops any subscriptions
536    /// that were received in blocks affected by the reorg (e.g. >= new_height).
537    fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
538        for waiters in self.waiting_confs.values_mut() {
539            *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
540                if let Some(received_at_block) = watcher.received_at_block {
541                    // All blocks after and _including_ the new height are reaped.
542                    if received_at_block >= new_height {
543                        let hash = watcher.config.tx_hash;
544                        debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed after chain gap");
545                        self.unconfirmed.insert(hash, watcher);
546                        return None;
547                    }
548                }
549                Some(watcher)
550            }).collect();
551        }
552    }
553
554    /// Check if we have any pending transactions.
555    fn has_pending_transactions(&self) -> bool {
556        !self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
557    }
558
559    /// Update the pause state based on whether we have pending transactions.
560    fn update_pause_state(&mut self) {
561        let should_pause = !self.has_pending_transactions();
562        if self.paused.is_paused() != should_pause {
563            debug!(paused = should_pause, "updating heartbeat pause state");
564            self.paused.set_paused(should_pause);
565        }
566    }
567
568    /// Handle a watch instruction by adding it to the watch list, and
569    /// potentially adding it to our `reap_at` list.
570    fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
571        // Start watching for the transaction.
572        debug!(tx=%to_watch.config.tx_hash, "watching");
573        trace!(?to_watch.config, ?to_watch.received_at_block);
574        if let Some(received_at_block) = to_watch.received_at_block {
575            // Transaction is already confirmed, we just need to wait for the required
576            // confirmations.
577            let confirmations = to_watch.config.required_confirmations;
578            let confirmed_at = received_at_block + confirmations - 1;
579            let current_height =
580                self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
581
582            if confirmed_at <= current_height {
583                to_watch.notify(Ok(()));
584            } else {
585                self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
586            }
587            return;
588        }
589
590        if let Some(timeout) = to_watch.config.timeout {
591            self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
592        }
593        // Transaction may be confirmed already, check the lookbehind history first.
594        // If so, insert it into the waiting list.
595        for (block_height, txs) in self.past_blocks.iter().rev() {
596            if txs.contains(&to_watch.config.tx_hash) {
597                let confirmations = to_watch.config.required_confirmations;
598                let confirmed_at = *block_height + confirmations - 1;
599                let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
600
601                if confirmed_at <= current_height {
602                    to_watch.notify(Ok(()));
603                } else {
604                    debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
605                    // Ensure reorg handling can move this watcher back if needed.
606                    let mut to_watch = to_watch;
607                    if to_watch.received_at_block.is_none() {
608                        to_watch.received_at_block = Some(*block_height);
609                    }
610                    self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
611                }
612                return;
613            }
614        }
615
616        self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
617    }
618
619    fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
620        let confirmations = watcher.config.required_confirmations;
621        debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
622        self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
623    }
624
625    /// Handle a new block by checking if any of the transactions we're
626    /// watching are in it, and if so, notifying the watcher. Also updates
627    /// the latest block.
628    fn handle_new_block(&mut self, block: N::BlockResponse) {
629        let block_height = block.header().as_ref().number();
630        debug!(%block_height, "handling block");
631
632        // Add the block the lookbehind.
633        // The value is chosen arbitrarily to not have a huge memory footprint but still
634        // catch most cases where user subscribes for an already mined transaction.
635        // Note that we expect provider to check whether transaction is already mined
636        // before subscribing, so here we only need to consider time before sending a notification
637        // and processing it.
638        const MAX_BLOCKS_TO_RETAIN: usize = 10;
639        if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
640            self.past_blocks.pop_front();
641        }
642        if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
643            // Check that the chain is continuous.
644            if *last_height + 1 != block_height {
645                // Move all the transactions that were reset by the reorg to the unconfirmed list.
646                // This can also happen if we unpaused the heartbeat after some time.
647                debug!(block_height, last_height, "reorg/unpause detected");
648                self.move_reorg_to_unconfirmed(block_height);
649                // Remove past blocks that are now invalid.
650                self.past_blocks.retain(|(h, _)| *h < block_height);
651            }
652        }
653        self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
654
655        // Check if we are watching for any of the transactions in this block.
656        let to_check: Vec<_> = block
657            .transactions()
658            .hashes()
659            .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
660            .collect();
661        for mut watcher in to_check {
662            // If `confirmations` is not more than 1 we can notify the watcher immediately.
663            let confirmations = watcher.config.required_confirmations;
664            if confirmations <= 1 {
665                watcher.notify(Ok(()));
666                continue;
667            }
668            // Otherwise add it to the waiting list.
669
670            // Set the block at which the transaction was received.
671            if let Some(set_block) = watcher.received_at_block {
672                warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
673                // We don't override the set value.
674            } else {
675                watcher.received_at_block = Some(block_height);
676            }
677            self.add_to_waiting_list(watcher, block_height);
678        }
679
680        self.check_confirmations(block_height);
681    }
682}
683
684#[cfg(target_family = "wasm")]
685impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
686    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
687    pub(crate) fn spawn(self) -> HeartbeatHandle {
688        let (task, handle) = self.consume();
689        task.spawn_task();
690        handle
691    }
692}
693
694#[cfg(not(target_family = "wasm"))]
695impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
696    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
697    pub(crate) fn spawn(self) -> HeartbeatHandle {
698        let (task, handle) = self.consume();
699        task.spawn_task();
700        handle
701    }
702}
703
704impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
705    fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
706        let (ix_tx, ixns) = mpsc::channel(64);
707        (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
708    }
709
710    async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
711        'shutdown: loop {
712            {
713                self.update_pause_state();
714
715                let next_reap = self.next_reap();
716                let sleep = std::pin::pin!(sleep_until(next_reap.into()));
717
718                // We bias the select so that we always handle new messages
719                // before checking blocks, and reap timeouts are last.
720                select! {
721                    biased;
722
723                    // Watch for new transactions.
724                    ix_opt = ixns.recv() => match ix_opt {
725                        Some(to_watch) => self.handle_watch_ix(to_watch),
726                        None => break 'shutdown, // ix channel is closed
727                    },
728
729                    // Wake up to handle new blocks.
730                    Some(block) = self.stream.next() => {
731                        self.handle_new_block(block);
732                    },
733
734                    // This arm ensures we always wake up to reap timeouts,
735                    // even if there are no other events.
736                    _ = sleep => {},
737                }
738            }
739
740            // Always reap timeouts
741            self.reap_timeouts();
742        }
743    }
744}