bitcoins_provider/
watcher.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4 time::Duration,
5};
6
7use futures_core::stream::Stream;
8use futures_util::stream::StreamExt;
9use pin_project::pin_project;
10
11use bitcoins::prelude::*;
12
13use crate::{
14 provider::BtcProvider,
15 utils::{new_interval, StreamLast},
16 ProviderFut, DEFAULT_POLL_INTERVAL,
17};
18
19enum WatcherStates<'a> {
20 WaitingSpends(ProviderFut<'a, Option<TXID>>),
22 Paused(usize, TXID),
23 WaitingMoreConfs(usize, TXID, ProviderFut<'a, Option<usize>>),
25 Completed,
27}
28
29#[pin_project(project = PollingWatcherProj)]
41#[must_use = "streams do nothing unless polled"]
42pub struct PollingWatcher<'a> {
43 outpoint: BitcoinOutpoint,
44 confirmations: usize,
45 state: WatcherStates<'a>,
46 interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
47 provider: &'a dyn BtcProvider,
48}
49
50impl<'a> PollingWatcher<'a> {
51 pub fn new(outpoint: BitcoinOutpoint, provider: &'a dyn BtcProvider) -> Self {
53 let fut = Box::pin(provider.get_outspend(outpoint));
54 Self {
55 outpoint,
56 confirmations: 0,
57 state: WatcherStates::WaitingSpends(fut),
58 interval: Box::new(new_interval(DEFAULT_POLL_INTERVAL)),
59 provider,
60 }
61 }
62
63 pub fn confirmations(mut self, confs: usize) -> Self {
65 self.confirmations = confs;
66 self
67 }
68
69 pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
71 self.interval = Box::new(new_interval(duration.into()));
72 self
73 }
74}
75
76impl StreamLast for PollingWatcher<'_> {}
77
78impl<'a> futures_core::stream::Stream for PollingWatcher<'a> {
79 type Item = (usize, Option<TXID>);
80
81 fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
82 let PollingWatcherProj {
83 outpoint,
84 confirmations,
85 state,
86 interval,
87 provider,
88 } = self.project();
89
90 match state {
91 WatcherStates::WaitingSpends(fut) => {
92 if let Poll::Ready(Ok(Some(txid))) = fut.as_mut().poll(ctx) {
93 if *confirmations > 0 {
94 let fut = Box::pin(provider.get_confs(txid));
96 *state = WatcherStates::WaitingMoreConfs(0, txid, fut);
97 return Poll::Ready(Some((0, Some(txid))));
98 } else {
99 *state = WatcherStates::Completed;
101 ctx.waker().wake_by_ref();
102 return Poll::Ready(Some((0, Some(txid))));
103 }
104 } else {
105 let fut = unpause!(ctx, interval, provider.get_outspend(*outpoint));
107 *state = WatcherStates::WaitingSpends(fut);
108 }
109 }
110 WatcherStates::Paused(previous_confs, txid) => {
111 let fut = unpause!(ctx, interval, provider.get_confs(*txid));
112 *state = WatcherStates::WaitingMoreConfs(*previous_confs, *txid, fut);
113 }
114 WatcherStates::WaitingMoreConfs(previous_confs, txid, fut) => {
115 match futures_util::ready!(fut.as_mut().poll(ctx)) {
116 Ok(None) => {
118 let fut = Box::pin(provider.get_outspend(*outpoint));
119 *state = WatcherStates::WaitingSpends(fut);
120 return Poll::Ready(Some((0, None)));
121 }
122 Ok(Some(confs)) => {
124 if confs > *previous_confs && confs < *confirmations {
126 let t = *txid;
127 *state = WatcherStates::Paused(confs, t);
128 return Poll::Ready(Some((confs, Some(t))));
129 }
130
131 if confs >= *confirmations {
133 let t = *txid;
134 *state = WatcherStates::Completed;
135 ctx.waker().wake_by_ref();
136 return Poll::Ready(Some((confs, Some(t))));
137 }
138 }
139 Err(e) => {
140 if !e.from_parsing() {
141 *state = WatcherStates::Paused(*previous_confs, *txid);
142 return Poll::Pending;
143 }
144 panic!(
146 "Non-network error in pending tx polling. This shouldn't be reachable"
147 );
148 }
149 }
150 }
151 WatcherStates::Completed => {
152 return Poll::Ready(None);
153 }
154 };
155 Poll::Pending
156 }
157}