bitcoins_provider/
watcher.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4    time::Duration,
5};
6
7use futures_core::stream::Stream;
8use futures_util::stream::StreamExt;
9use pin_project::pin_project;
10
11use bitcoins::prelude::*;
12
13use crate::{
14    provider::BtcProvider,
15    utils::{new_interval, StreamLast},
16    ProviderFut, DEFAULT_POLL_INTERVAL,
17};
18
19enum WatcherStates<'a> {
20    // Waiting for a tx to spend
21    WaitingSpends(ProviderFut<'a, Option<TXID>>),
22    Paused(usize, TXID),
23    // Tx known, getting confs
24    WaitingMoreConfs(usize, TXID, ProviderFut<'a, Option<usize>>),
25    // Future has completed, and should panic if polled again
26    Completed,
27}
28
29/// A stream that monitors a UTXO by its outpoint. Periodically polls the API to see if the UTXO
30/// has been spent. Due to API limitations, if the spending transaction receives a confirmation
31/// before the first poll, 0 confirmations will be reported.
32///
33/// This struct implements `futures::stream::Stream`.
34///
35/// When used as a `Stream`, the stream will produce a value when a tx has been broadcast, and
36/// each time the poller sees the number of confirmations increase. After receiving
37/// `>= self.confirmations` confirmations, the stream will finish.
38///
39/// To get a future yielding a single event when the stream ends, use `StreamLast::last()`
40#[pin_project(project = PollingWatcherProj)]
41#[must_use = "streams do nothing unless polled"]
42pub struct PollingWatcher<'a> {
43    outpoint: BitcoinOutpoint,
44    confirmations: usize,
45    state: WatcherStates<'a>,
46    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
47    provider: &'a dyn BtcProvider,
48}
49
50impl<'a> PollingWatcher<'a> {
51    /// Creates a new outspend poller
52    pub fn new(outpoint: BitcoinOutpoint, provider: &'a dyn BtcProvider) -> Self {
53        let fut = Box::pin(provider.get_outspend(outpoint));
54        Self {
55            outpoint,
56            confirmations: 0,
57            state: WatcherStates::WaitingSpends(fut),
58            interval: Box::new(new_interval(DEFAULT_POLL_INTERVAL)),
59            provider,
60        }
61    }
62
63    /// Sets the number of confirmations before being notified of the spend
64    pub fn confirmations(mut self, confs: usize) -> Self {
65        self.confirmations = confs;
66        self
67    }
68
69    /// Sets the polling interval
70    pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
71        self.interval = Box::new(new_interval(duration.into()));
72        self
73    }
74}
75
76impl StreamLast for PollingWatcher<'_> {}
77
78impl<'a> futures_core::stream::Stream for PollingWatcher<'a> {
79    type Item = (usize, Option<TXID>);
80
81    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
82        let PollingWatcherProj {
83            outpoint,
84            confirmations,
85            state,
86            interval,
87            provider,
88        } = self.project();
89
90        match state {
91            WatcherStates::WaitingSpends(fut) => {
92                if let Poll::Ready(Ok(Some(txid))) = fut.as_mut().poll(ctx) {
93                    if *confirmations > 0 {
94                        // if we need >0 confs start waiting for more
95                        let fut = Box::pin(provider.get_confs(txid));
96                        *state = WatcherStates::WaitingMoreConfs(0, txid, fut);
97                        return Poll::Ready(Some((0, Some(txid))));
98                    } else {
99                        // if 0 confs, end the stream on the first seen tx
100                        *state = WatcherStates::Completed;
101                        ctx.waker().wake_by_ref();
102                        return Poll::Ready(Some((0, Some(txid))));
103                    }
104                } else {
105                    // Continue otherwise
106                    let fut = unpause!(ctx, interval, provider.get_outspend(*outpoint));
107                    *state = WatcherStates::WaitingSpends(fut);
108                }
109            }
110            WatcherStates::Paused(previous_confs, txid) => {
111                let fut = unpause!(ctx, interval, provider.get_confs(*txid));
112                *state = WatcherStates::WaitingMoreConfs(*previous_confs, *txid, fut);
113            }
114            WatcherStates::WaitingMoreConfs(previous_confs, txid, fut) => {
115                match futures_util::ready!(fut.as_mut().poll(ctx)) {
116                    // Spend tx has dropped from the mempool. Go back to `WaitingSpends`
117                    Ok(None) => {
118                        let fut = Box::pin(provider.get_outspend(*outpoint));
119                        *state = WatcherStates::WaitingSpends(fut);
120                        return Poll::Ready(Some((0, None)));
121                    }
122                    // Spend tx has confs. Check if there are any new ones
123                    Ok(Some(confs)) => {
124                        // If we're not at our limit, pause for the interval
125                        if confs > *previous_confs && confs < *confirmations {
126                            let t = *txid;
127                            *state = WatcherStates::Paused(confs, t);
128                            return Poll::Ready(Some((confs, Some(t))));
129                        }
130
131                        // If we have enough confs, go to completed
132                        if confs >= *confirmations {
133                            let t = *txid;
134                            *state = WatcherStates::Completed;
135                            ctx.waker().wake_by_ref();
136                            return Poll::Ready(Some((confs, Some(t))));
137                        }
138                    }
139                    Err(e) => {
140                        if !e.from_parsing() {
141                            *state = WatcherStates::Paused(*previous_confs, *txid);
142                            return Poll::Pending;
143                        }
144                        // TODO: handle better?
145                        panic!(
146                            "Non-network error in pending tx polling. This shouldn't be reachable"
147                        );
148                    }
149                }
150            }
151            WatcherStates::Completed => {
152                return Poll::Ready(None);
153            }
154        };
155        Poll::Pending
156    }
157}