1use crate::{blocks::Paused, Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8 map::{B256HashMap, B256HashSet},
9 TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{future::pending, stream::StreamExt, FutureExt, Stream};
13use std::{
14 collections::{BTreeMap, VecDeque},
15 fmt,
16 future::Future,
17 sync::Arc,
18 time::Duration,
19};
20use tokio::{
21 select,
22 sync::{mpsc, oneshot},
23};
24
25#[cfg(all(target_family = "wasm", target_os = "unknown"))]
26use wasmtimer::{
27 std::Instant,
28 tokio::{interval, sleep_until},
29};
30
31#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
32use {
33 std::time::Instant,
34 tokio::time::{interval, sleep_until},
35};
36
37#[derive(Debug, thiserror::Error)]
39pub enum PendingTransactionError {
40 #[error("failed to register pending transaction to watch")]
42 FailedToRegister,
43
44 #[error(transparent)]
46 TransportError(#[from] TransportError),
47
48 #[error(transparent)]
50 Recv(#[from] oneshot::error::RecvError),
51
52 #[error(transparent)]
54 TxWatcher(#[from] WatchTxError),
55}
56
57#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
91#[derive(Debug)]
92#[doc(alias = "PendingTxBuilder")]
93pub struct PendingTransactionBuilder<N: Network> {
94 config: PendingTransactionConfig,
95 provider: RootProvider<N>,
96}
97
98impl<N: Network> PendingTransactionBuilder<N> {
99 pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
101 Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
102 }
103
104 pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
106 Self { config, provider }
107 }
108
109 pub const fn inner(&self) -> &PendingTransactionConfig {
111 &self.config
112 }
113
114 pub fn into_inner(self) -> PendingTransactionConfig {
116 self.config
117 }
118
119 pub const fn provider(&self) -> &RootProvider<N> {
121 &self.provider
122 }
123
124 pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
126 (self.provider, self.config)
127 }
128
129 pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
131 f(&self);
132 self
133 }
134
135 #[doc(alias = "transaction_hash")]
137 pub const fn tx_hash(&self) -> &TxHash {
138 self.config.tx_hash()
139 }
140
141 #[doc(alias = "set_transaction_hash")]
143 pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
144 self.config.set_tx_hash(tx_hash);
145 }
146
147 #[doc(alias = "with_transaction_hash")]
149 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
150 self.config.tx_hash = tx_hash;
151 self
152 }
153
154 #[doc(alias = "confirmations")]
156 pub const fn required_confirmations(&self) -> u64 {
157 self.config.required_confirmations()
158 }
159
160 #[doc(alias = "set_confirmations")]
162 pub const fn set_required_confirmations(&mut self, confirmations: u64) {
163 self.config.set_required_confirmations(confirmations);
164 }
165
166 #[doc(alias = "with_confirmations")]
168 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
169 self.config.required_confirmations = confirmations;
170 self
171 }
172
173 pub const fn timeout(&self) -> Option<Duration> {
175 self.config.timeout()
176 }
177
178 pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
180 self.config.set_timeout(timeout);
181 }
182
183 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185 self.config.timeout = timeout;
186 self
187 }
188
189 #[doc(alias = "build")]
199 pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
200 self.provider.watch_pending_transaction(self.config).await
201 }
202
203 pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
211 self.register().await?.await
212 }
213
214 pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
226 let hash = self.config.tx_hash;
227 let required_confirmations = self.config.required_confirmations;
228 let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
229
230 let mut interval = if required_confirmations > 1 {
234 None
235 } else {
236 Some(interval(self.provider.client().poll_interval()))
237 };
238
239 loop {
240 let mut confirmed = false;
241
242 let tick_fut = if let Some(interval) = interval.as_mut() {
246 interval.tick().map(|_| ()).left_future()
247 } else {
248 pending::<()>().right_future()
249 };
250
251 select! {
252 _ = tick_fut => {},
253 res = &mut pending_tx => {
254 let _ = res?;
255 confirmed = true;
256 }
257 }
258
259 let receipt = self.provider.get_transaction_receipt(hash).await?;
261 if let Some(receipt) = receipt {
262 return Ok(receipt);
263 }
264
265 if confirmed {
266 return Err(RpcError::NullResp.into());
267 }
268 }
269 }
270}
271
272#[must_use = "this type does nothing unless you call `with_provider`"]
277#[derive(Clone, Debug)]
278#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
279pub struct PendingTransactionConfig {
280 #[doc(alias = "transaction_hash")]
282 tx_hash: TxHash,
283
284 required_confirmations: u64,
286
287 timeout: Option<Duration>,
289}
290
291impl PendingTransactionConfig {
292 pub const fn new(tx_hash: TxHash) -> Self {
294 Self { tx_hash, required_confirmations: 1, timeout: None }
295 }
296
297 #[doc(alias = "transaction_hash")]
299 pub const fn tx_hash(&self) -> &TxHash {
300 &self.tx_hash
301 }
302
303 #[doc(alias = "set_transaction_hash")]
305 pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
306 self.tx_hash = tx_hash;
307 }
308
309 #[doc(alias = "with_transaction_hash")]
311 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
312 self.tx_hash = tx_hash;
313 self
314 }
315
316 #[doc(alias = "confirmations")]
318 pub const fn required_confirmations(&self) -> u64 {
319 self.required_confirmations
320 }
321
322 #[doc(alias = "set_confirmations")]
324 pub const fn set_required_confirmations(&mut self, confirmations: u64) {
325 self.required_confirmations = confirmations;
326 }
327
328 #[doc(alias = "with_confirmations")]
330 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
331 self.required_confirmations = confirmations;
332 self
333 }
334
335 pub const fn timeout(&self) -> Option<Duration> {
337 self.timeout
338 }
339
340 pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
342 self.timeout = timeout;
343 }
344
345 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
347 self.timeout = timeout;
348 self
349 }
350
351 pub const fn with_provider<N: Network>(
353 self,
354 provider: RootProvider<N>,
355 ) -> PendingTransactionBuilder<N> {
356 PendingTransactionBuilder::from_config(provider, self)
357 }
358}
359
360impl From<TxHash> for PendingTransactionConfig {
361 fn from(tx_hash: TxHash) -> Self {
362 Self::new(tx_hash)
363 }
364}
365
366#[derive(Debug, thiserror::Error)]
368pub enum WatchTxError {
369 #[error("transaction was not confirmed within the timeout")]
371 Timeout,
372}
373
374#[doc(alias = "TransactionWatcher")]
376struct TxWatcher {
377 config: PendingTransactionConfig,
378 received_at_block: Option<u64>,
381 tx: oneshot::Sender<Result<(), WatchTxError>>,
382}
383
384impl TxWatcher {
385 fn notify(self, result: Result<(), WatchTxError>) {
387 debug!(tx=%self.config.tx_hash, "notifying");
388 let _ = self.tx.send(result);
389 }
390}
391
392#[doc(alias = "PendingTx", alias = "TxPending")]
398pub struct PendingTransaction {
399 #[doc(alias = "transaction_hash")]
401 pub(crate) tx_hash: TxHash,
402 pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
405}
406
407impl fmt::Debug for PendingTransaction {
408 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
409 f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
410 }
411}
412
413impl PendingTransaction {
414 pub fn ready(tx_hash: TxHash) -> Self {
416 let (tx, rx) = oneshot::channel();
417 tx.send(Ok(())).ok(); Self { tx_hash, rx }
419 }
420
421 #[doc(alias = "transaction_hash")]
423 pub const fn tx_hash(&self) -> &TxHash {
424 &self.tx_hash
425 }
426}
427
428impl Future for PendingTransaction {
429 type Output = Result<TxHash, PendingTransactionError>;
430
431 fn poll(
432 mut self: std::pin::Pin<&mut Self>,
433 cx: &mut std::task::Context<'_>,
434 ) -> std::task::Poll<Self::Output> {
435 self.rx.poll_unpin(cx).map(|res| {
436 res??;
437 Ok(self.tx_hash)
438 })
439 }
440}
441
442#[derive(Clone, Debug)]
444pub(crate) struct HeartbeatHandle {
445 tx: mpsc::Sender<TxWatcher>,
446}
447
448impl HeartbeatHandle {
449 #[doc(alias = "watch_transaction")]
451 pub(crate) async fn watch_tx(
452 &self,
453 config: PendingTransactionConfig,
454 received_at_block: Option<u64>,
455 ) -> Result<PendingTransaction, PendingTransactionConfig> {
456 let (tx, rx) = oneshot::channel();
457 let tx_hash = config.tx_hash;
458 match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
459 Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
460 Err(e) => Err(e.0.config),
461 }
462 }
463}
464
465pub(crate) struct Heartbeat<N, S> {
467 stream: futures::stream::Fuse<S>,
469
470 past_blocks: VecDeque<(u64, B256HashSet)>,
472
473 unconfirmed: B256HashMap<TxWatcher>,
475
476 waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
478
479 reap_at: BTreeMap<Instant, B256>,
481
482 paused: Arc<Paused>,
484
485 _network: std::marker::PhantomData<N>,
486}
487
488impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
489 pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
491 Self {
492 stream: stream.fuse(),
493 past_blocks: Default::default(),
494 unconfirmed: Default::default(),
495 waiting_confs: Default::default(),
496 reap_at: Default::default(),
497 paused: is_paused,
498 _network: Default::default(),
499 }
500 }
501
502 fn check_confirmations(&mut self, current_height: u64) {
504 let to_keep = self.waiting_confs.split_off(&(current_height + 1));
505 let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
506 for watcher in to_notify.into_values().flatten() {
507 watcher.notify(Ok(()));
508 }
509 }
510
511 fn next_reap(&self) -> Instant {
514 self.reap_at
515 .first_key_value()
516 .map(|(k, _)| *k)
517 .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
518 }
519
520 fn reap_timeouts(&mut self) {
522 let now = Instant::now();
523 let to_keep = self.reap_at.split_off(&now);
524 let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
525
526 for tx_hash in to_reap.values() {
527 if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
528 debug!(tx=%tx_hash, "reaped");
529 watcher.notify(Err(WatchTxError::Timeout));
530 }
531 }
532 }
533
534 fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
538 for waiters in self.waiting_confs.values_mut() {
539 *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
540 if let Some(received_at_block) = watcher.received_at_block {
541 if received_at_block >= new_height {
543 let hash = watcher.config.tx_hash;
544 debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed after chain gap");
545 self.unconfirmed.insert(hash, watcher);
546 return None;
547 }
548 }
549 Some(watcher)
550 }).collect();
551 }
552 }
553
554 fn has_pending_transactions(&self) -> bool {
556 !self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
557 }
558
559 fn update_pause_state(&mut self) {
561 let should_pause = !self.has_pending_transactions();
562 if self.paused.is_paused() != should_pause {
563 debug!(paused = should_pause, "updating heartbeat pause state");
564 self.paused.set_paused(should_pause);
565 }
566 }
567
568 fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
571 debug!(tx=%to_watch.config.tx_hash, "watching");
573 trace!(?to_watch.config, ?to_watch.received_at_block);
574 if let Some(received_at_block) = to_watch.received_at_block {
575 let confirmations = to_watch.config.required_confirmations;
578 let confirmed_at = received_at_block + confirmations - 1;
579 let current_height =
580 self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
581
582 if confirmed_at <= current_height {
583 to_watch.notify(Ok(()));
584 } else {
585 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
586 }
587 return;
588 }
589
590 if let Some(timeout) = to_watch.config.timeout {
591 self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
592 }
593 for (block_height, txs) in self.past_blocks.iter().rev() {
596 if txs.contains(&to_watch.config.tx_hash) {
597 let confirmations = to_watch.config.required_confirmations;
598 let confirmed_at = *block_height + confirmations - 1;
599 let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
600
601 if confirmed_at <= current_height {
602 to_watch.notify(Ok(()));
603 } else {
604 debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
605 let mut to_watch = to_watch;
607 if to_watch.received_at_block.is_none() {
608 to_watch.received_at_block = Some(*block_height);
609 }
610 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
611 }
612 return;
613 }
614 }
615
616 self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
617 }
618
619 fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
620 let confirmations = watcher.config.required_confirmations;
621 debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
622 self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
623 }
624
625 fn handle_new_block(&mut self, block: N::BlockResponse) {
629 let block_height = block.header().as_ref().number();
630 debug!(%block_height, "handling block");
631
632 const MAX_BLOCKS_TO_RETAIN: usize = 10;
639 if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
640 self.past_blocks.pop_front();
641 }
642 if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
643 if *last_height + 1 != block_height {
645 debug!(block_height, last_height, "reorg/unpause detected");
648 self.move_reorg_to_unconfirmed(block_height);
649 self.past_blocks.retain(|(h, _)| *h < block_height);
651 }
652 }
653 self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
654
655 let to_check: Vec<_> = block
657 .transactions()
658 .hashes()
659 .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
660 .collect();
661 for mut watcher in to_check {
662 let confirmations = watcher.config.required_confirmations;
664 if confirmations <= 1 {
665 watcher.notify(Ok(()));
666 continue;
667 }
668 if let Some(set_block) = watcher.received_at_block {
672 warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
673 } else {
675 watcher.received_at_block = Some(block_height);
676 }
677 self.add_to_waiting_list(watcher, block_height);
678 }
679
680 self.check_confirmations(block_height);
681 }
682}
683
684#[cfg(target_family = "wasm")]
685impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
686 pub(crate) fn spawn(self) -> HeartbeatHandle {
688 let (task, handle) = self.consume();
689 task.spawn_task();
690 handle
691 }
692}
693
694#[cfg(not(target_family = "wasm"))]
695impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
696 pub(crate) fn spawn(self) -> HeartbeatHandle {
698 let (task, handle) = self.consume();
699 task.spawn_task();
700 handle
701 }
702}
703
704impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
705 fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
706 let (ix_tx, ixns) = mpsc::channel(64);
707 (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
708 }
709
710 async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
711 'shutdown: loop {
712 {
713 self.update_pause_state();
714
715 let next_reap = self.next_reap();
716 let sleep = std::pin::pin!(sleep_until(next_reap.into()));
717
718 select! {
721 biased;
722
723 ix_opt = ixns.recv() => match ix_opt {
725 Some(to_watch) => self.handle_watch_ix(to_watch),
726 None => break 'shutdown, },
728
729 Some(block) = self.stream.next() => {
731 self.handle_new_block(block);
732 },
733
734 _ = sleep => {},
737 }
738 }
739
740 self.reap_timeouts();
742 }
743 }
744}