bitcoincore_zmqsequence/
lib.rs

1use crate::errors::ZMQSeqListenerError;
2use std::str;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5use std::sync::mpsc::channel;
6use std::sync::mpsc::Receiver;
7use std::sync::{Arc, Barrier};
8use std::thread;
9use std::thread::JoinHandle;
10
11use url::Url;
12
13pub mod errors;
14
15#[cfg(feature = "check_node")]
16pub mod check;
17
18type Msg = Vec<Vec<u8>>;
19
20/// Enum with all possible ZMQ messages arriving through the receiver
21#[derive(Clone, PartialEq, Eq, Debug)]
22pub enum MempoolSequence {
23    ///A new block has arrived with `block_hash`
24    BlockConnection { block_hash: String, zmq_seq: u32 },
25    ///A block with `block_hash` has been overridden
26    BlockDisconnection { block_hash: String, zmq_seq: u32 },
27    ///A tx with `txid` has been removed from the mempool.
28    TxRemoved {
29        txid: String,
30        mp_seq_num: u64,
31        zmq_seq: u32,
32    },
33    ///A tx with `txid` has been added to the mempool.
34    TxAdded {
35        txid: String,
36        mp_seq_num: u64,
37        zmq_seq: u32,
38    },
39    ///An error has ocurred, and must be handled.
40    SeqError { error: ZMQSeqListenerError },
41    ///First message to be received, shows if bitcoind node was already working when first msg
42    ///arrives (zmq_seq==0)
43    SeqStart { bitcoind_already_working: bool },
44}
45
46impl TryFrom<Msg> for MempoolSequence {
47    type Error = ZMQSeqListenerError;
48
49    fn try_from(msg: Vec<Vec<u8>>) -> Result<Self, Self::Error> {
50        //Safe to do next three unwraps
51        if msg.len() < 3 {
52            return Err(ZMQSeqListenerError::MsgError("msg.len() < 3".to_string()));
53        };
54        let topic = str::from_utf8(msg.get(0).unwrap())?;
55        let body = msg.get(1).unwrap();
56        let zmqseq = msg.get(2).unwrap();
57
58        if topic.ne("sequence") {
59            return Err(ZMQSeqListenerError::TopicError(topic.to_string()));
60        }
61        let zmq_seq = parse_zmq_seq_num(zmqseq)?;
62        let char = *body.get(32).ok_or(ZMQSeqListenerError::MsgError(
63            "Not enough size, expected [u8;33]".to_string(),
64        ))?;
65        match char {
66            // "C"
67            67 => Ok(MempoolSequence::BlockConnection {
68                block_hash: hex::encode(&body[..32]),
69                zmq_seq,
70            }),
71            // "D"
72            68 => Ok(MempoolSequence::BlockDisconnection {
73                block_hash: hex::encode(&body[..32]),
74                zmq_seq,
75            }),
76            // "R"
77            82 => Ok(MempoolSequence::TxRemoved {
78                txid: hex::encode(&body[..32]),
79                mp_seq_num: parse_mp_seq_num(body)?,
80                zmq_seq,
81            }),
82            // "A"
83            65 => Ok(MempoolSequence::TxAdded {
84                txid: hex::encode(&body[..32]),
85                mp_seq_num: parse_mp_seq_num(body)?,
86                zmq_seq,
87            }),
88            ch => Err(ZMQSeqListenerError::CharCodeError(ch.to_string())),
89        }
90    }
91}
92
93fn parse_mp_seq_num(value: &[u8]) -> Result<u64, ZMQSeqListenerError> {
94    let ch: [u8; 8] = match value[33..41].try_into() {
95        Ok(val) => Ok(val),
96        Err(_) => Err(ZMQSeqListenerError::MsgError(
97            "Not enough size, expected [u8;41]".to_string(),
98        )),
99    }?;
100    Ok(u64::from_le_bytes(ch))
101}
102
103fn parse_zmq_seq_num(value: &[u8]) -> Result<u32, ZMQSeqListenerError> {
104    let ch: [u8; 4] = match value[..4].try_into() {
105        Ok(val) => Ok(val),
106        Err(_) => Err(ZMQSeqListenerError::MsgError(
107            "Not enough size, expected [u8;4]".to_string(),
108        )),
109    }?;
110    Ok(u32::from_le_bytes(ch))
111}
112
113/// A bitcoincore ZMQ subscriber for receiving zmqpubsequence as defined [here](https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md)
114///
115/// This is a syncronous library that launches a listening thread which uses a start/stop interface
116/// and receive data via a channel receiver.
117///
118/// [`MempoolSequence::SeqStart`] is the first data received containing whether bitcoind node has
119/// started while we were listening, or the node was already up and running. This is important because in the fist case, the channel receiver will broadcast every block and
120/// tx during node syncronization with the network. As this is normally not intended, use the
121/// [`check::NodeChecker::wait_till_node_ok`] method available when using `check_node` cargo
122/// feature.
123///
124/// After [`MempoolSequence::SeqStart`] a sequence of [`MempoolSequence::BlockConnection`],
125/// [`MempoolSequence::BlockDisconnection`], [`MempoolSequence::TxAdded`] and
126/// [`MempoolSequence::TxRemoved`] follows.
127///
128/// If any error happens, a [`MempoolSequence::SeqError`] will be sent.
129///
130/// Note that this Listener is aware of message interruptions, that is, if a zmq_seq number
131/// is skipped, a [`MempoolSequence::SeqError`] with a [`ZMQSeqListenerError::InvalidSeqNumber`] is
132/// sent.
133///
134/// Example:
135/// ```
136///    let zmqseqlistener = ZmqSeqListener::start(&Url::from_str("tcp://localhost:29000")?)?;
137///    loop{
138///        let kk = zmqseqlistener.rx.recv()?;
139///        println!("{:?}", kk);
140///    }
141///
142/// ```
143pub struct ZmqSeqListener {
144    pub rx: Receiver<MempoolSequence>,
145    pub stop: Arc<AtomicBool>,
146    pub thread: JoinHandle<()>,
147}
148
149impl ZmqSeqListener {
150    ///Starts listening ZMQ messages given the `zmq_address`.
151    pub fn start(zmq_address: &Url) -> Result<Self, ZMQSeqListenerError> {
152        let context = zmq::Context::new();
153        let subscriber = context.socket(zmq::SUB)?;
154        subscriber.connect(zmq_address.as_str())?;
155        subscriber.set_subscribe(b"sequence")?;
156        let stop_th = Arc::new(AtomicBool::new(false));
157        let stop = stop_th.clone();
158        let (tx, rx) = channel();
159        //Use a barrier, Zmq is "slow joiner"
160        let barrier = Arc::new(Barrier::new(2));
161        let barrierc = barrier.clone();
162        let thread = thread::spawn(move || {
163            let mut is_starting = true;
164            let mut last_zmq_seq = 0;
165            while !stop_th.load(Ordering::SeqCst) {
166                let mpsq = match receive_mpsq(&subscriber) {
167                    Ok(mpsq) => mpsq,
168                    Err(e) => MempoolSequence::SeqError { error: e },
169                };
170                if is_starting {
171                    barrier.wait();
172                    tx.send(MempoolSequence::SeqStart {
173                        bitcoind_already_working: check_bitcoind_already_working(&mpsq),
174                    })
175                    .unwrap();
176                    is_starting = false;
177                } else {
178                    let zmq_seq = zmq_seq_from(&mpsq);
179                    if zmq_seq.is_some() {
180                        if zmq_seq.unwrap() != last_zmq_seq + 1 {
181                            tx.send(MempoolSequence::SeqError {
182                                error: ZMQSeqListenerError::InvalidSeqNumber(
183                                    last_zmq_seq + 1,
184                                    zmq_seq.unwrap(),
185                                ),
186                            })
187                            .unwrap();
188                        }
189                    }
190                }
191                last_zmq_seq = zmq_seq_from(&mpsq).unwrap_or(last_zmq_seq);
192                tx.send(mpsq).unwrap();
193            }
194        });
195        barrierc.wait();
196        Ok(ZmqSeqListener { rx, stop, thread })
197    }
198}
199
200fn check_bitcoind_already_working(mpsq: &MempoolSequence) -> bool {
201    matches!(
202        mpsq,
203        MempoolSequence::BlockConnection { zmq_seq, .. }
204            | MempoolSequence::BlockDisconnection { zmq_seq, .. }
205            | MempoolSequence::TxRemoved { zmq_seq, .. }
206            | MempoolSequence::TxAdded { zmq_seq, .. }
207            if *zmq_seq != 0
208    )
209}
210
211fn zmq_seq_from(mpsq: &MempoolSequence) -> Option<u32> {
212    match mpsq {
213        MempoolSequence::BlockConnection { zmq_seq, .. }
214        | MempoolSequence::BlockDisconnection { zmq_seq, .. }
215        | MempoolSequence::TxRemoved { zmq_seq, .. }
216        | MempoolSequence::TxAdded { zmq_seq, .. } => Some(*zmq_seq),
217        _ => None,
218    }
219}
220
221fn receive_mpsq(subscriber: &zmq::Socket) -> Result<MempoolSequence, ZMQSeqListenerError> {
222    let res = {
223        let msg = subscriber.recv_multipart(0)?;
224        let mpsq = MempoolSequence::try_from(msg)?;
225        Ok(mpsq)
226    };
227    res
228}