1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::{
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;

use bitcoins::prelude::*;

use crate::{
    provider::BtcProvider,
    utils::{new_interval, StreamLast},
    ProviderFut, DEFAULT_POLL_INTERVAL,
};

enum WatcherStates<'a> {
    // Waiting for a tx to spend
    WaitingSpends(ProviderFut<'a, Option<TXID>>),
    Paused(usize, TXID),
    // Tx known, getting confs
    WaitingMoreConfs(usize, TXID, ProviderFut<'a, Option<usize>>),
    // Future has completed, and should panic if polled again
    Completed,
}

/// A stream that monitors a UTXO by its outpoint. Periodically polls the API to see if the UTXO
/// has been spent. Due to API limitations, if the spending transaction receives a confirmation
/// before the first poll, 0 confirmations will be reported.
///
/// This struct implements `futures::stream::Stream`.
///
/// When used as a `Stream`, the stream will produce a value when a tx has been broadcast, and
/// each time the poller sees the number of confirmations increase. After receiving
/// `>= self.confirmations` confirmations, the stream will finish.
///
/// To get a future yielding a single event when the stream ends, use `StreamLast::last()`
#[pin_project(project = PollingWatcherProj)]
#[must_use = "streams do nothing unless polled"]
pub struct PollingWatcher<'a> {
    outpoint: BitcoinOutpoint,
    confirmations: usize,
    state: WatcherStates<'a>,
    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
    provider: &'a dyn BtcProvider,
}

impl<'a> PollingWatcher<'a> {
    /// Creates a new outspend poller
    pub fn new(outpoint: BitcoinOutpoint, provider: &'a dyn BtcProvider) -> Self {
        let fut = Box::pin(provider.get_outspend(outpoint));
        Self {
            outpoint,
            confirmations: 0,
            state: WatcherStates::WaitingSpends(fut),
            interval: Box::new(new_interval(DEFAULT_POLL_INTERVAL)),
            provider,
        }
    }

    /// Sets the number of confirmations before being notified of the spend
    pub fn confirmations(mut self, confs: usize) -> Self {
        self.confirmations = confs;
        self
    }

    /// Sets the polling interval
    pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
        self.interval = Box::new(new_interval(duration.into()));
        self
    }
}

impl StreamLast for PollingWatcher<'_> {}

impl<'a> futures_core::stream::Stream for PollingWatcher<'a> {
    type Item = (usize, Option<TXID>);

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
        let PollingWatcherProj {
            outpoint,
            confirmations,
            state,
            interval,
            provider,
        } = self.project();

        match state {
            WatcherStates::WaitingSpends(fut) => {
                if let Poll::Ready(Ok(Some(txid))) = fut.as_mut().poll(ctx) {
                    if *confirmations > 0 {
                        // if we need >0 confs start waiting for more
                        let fut = Box::pin(provider.get_confs(txid));
                        *state = WatcherStates::WaitingMoreConfs(0, txid, fut);
                        return Poll::Ready(Some((0, Some(txid))));
                    } else {
                        // if 0 confs, end the stream on the first seen tx
                        *state = WatcherStates::Completed;
                        ctx.waker().wake_by_ref();
                        return Poll::Ready(Some((0, Some(txid))));
                    }
                } else {
                    // Continue otherwise
                    let fut = unpause!(ctx, interval, provider.get_outspend(*outpoint));
                    *state = WatcherStates::WaitingSpends(fut);
                }
            }
            WatcherStates::Paused(previous_confs, txid) => {
                let fut = unpause!(ctx, interval, provider.get_confs(*txid));
                *state = WatcherStates::WaitingMoreConfs(*previous_confs, *txid, fut);
            }
            WatcherStates::WaitingMoreConfs(previous_confs, txid, fut) => {
                match futures_util::ready!(fut.as_mut().poll(ctx)) {
                    // Spend tx has dropped from the mempool. Go back to `WaitingSpends`
                    Ok(None) => {
                        let fut = Box::pin(provider.get_outspend(*outpoint));
                        *state = WatcherStates::WaitingSpends(fut);
                        return Poll::Ready(Some((0, None)));
                    }
                    // Spend tx has confs. Check if there are any new ones
                    Ok(Some(confs)) => {
                        // If we're not at our limit, pause for the interval
                        if confs > *previous_confs && confs < *confirmations {
                            let t = *txid;
                            *state = WatcherStates::Paused(confs, t);
                            return Poll::Ready(Some((confs, Some(t))));
                        }

                        // If we have enough confs, go to completed
                        if confs >= *confirmations {
                            let t = *txid;
                            *state = WatcherStates::Completed;
                            ctx.waker().wake_by_ref();
                            return Poll::Ready(Some((confs, Some(t))));
                        }
                    }
                    Err(e) => {
                        if !e.from_parsing() {
                            *state = WatcherStates::Paused(*previous_confs, *txid);
                            return Poll::Pending;
                        }
                        // TODO: handle better?
                        panic!(
                            "Non-network error in pending tx polling. This shouldn't be reachable"
                        );
                    }
                }
            }
            WatcherStates::Completed => {
                return Poll::Ready(None);
            }
        };
        Poll::Pending
    }
}